Skip to main content

Command Palette

Search for a command to run...

SpriteDX - Pipeline Runner

Published
5 min readView as Markdown
SpriteDX - Pipeline Runner

This week has been really really draining. Two and half days were lost due to course work, and I want to try to build up a momentum again today, but little low on energy.

So, we json-ified our pipeline and laid out the Parameter Board with the inputs. Now, we need to wire up an a runner to actually run the pipeline once “Generate“ button is clicked.

What it looks like right now:

    const job = createJob({
      tasks: [{
        message: "Generating character…",
        expectedDuration: 60 * 1000,
        handler: async () => {
          const [characterResult, characterOutput] = await generateCharacterCached({
            prompt: "...",
            dataUrl: await urlToDataUrl(templateImageUrl),
            templateRows,
            templateCols,
            seed: 42,
            guidance: 60,
          });
          await fileStore.storeComfyOutput(characterOutput, intermediatesDir);
          return characterResult;
        }
      }, {
        message: "Animating character…",
        expectedDuration: 60 * 1000,
        handler: async (characterResult: WorkflowOutputImage) => {
          const [animationResult, animationOutput] = await animateCharacterCached({
            dataUrl: outputToDataURL(characterResult)
          });
          await fileStore.storeComfyOutput(animationOutput, intermediatesDir);
          return animationResult;
        }
      }, {
        message: "Cutting shots…",
        expectedDuration: 15 * 1000,
        handler: async (animationResult: WorkflowOutputImage) => {
          const [cutResult, cutOutput] = await cutShotsCached({ dataUrl: outputToDataURL(animationResult) });
          await fileStore.storeComfyOutput(cutOutput, intermediatesDir);
          console.log("Cut result:", cutResult);
          return cutResult;
        }
      }, {
        message: "Postprocessing…",
        expectedDuration: 15 * 1000,
        handler: async (cutResult: WorkflowOutputImage[]) => {
          for (let i = 0; i < cutResult.length; i++) {
            const cut = cutResult[i];
            const [postProcessResult, postProcessOutput] = await postProcess({ dataUrl: outputToDataURL(cut), key: i });
            await fileStore.storeComfyOutput(postProcessOutput, intermediatesDir);
            // Save final result
            await fileStore.storeComfyOutput({ images: [postProcessResult] }, dir);
          }
        }
      }]
    });

which are basically calling the hardcoded functions.

Let’s make them dynamic based on the pipeline definition.

Assumptions

Let’s make some simplifications.

  1. Pipelines only have single level of stages.

Implementation Plan

Pipeline is JSON-ified and is static definition. So we can make a function that will take in the pipeline definition and produce set of tasks. Let’s define a function to do exactly that. It takes in the pipeline and hash of values.

export function createJobFromPipeline(
  pipeline: Pipeline, 
  values: Record<string, unknown>
): Job {
  return { 
    tasks: pipeline.stages.map(stage => createTaskFromStage({ stage, … }));
  return { tasks } as Job;
}

Let’s design createTaskFromStage.

Inside each stage, we use dot paths to refer to previous stages. For example:

    // stage example
    {
      "id": "generate",
      "name": "Prompt",
      "type": "runner",
      "statusMessage": "Generating Reference…",
      "runner": "ComfyUI",
      "workflowRef": "path/to/workflow.json",
      "inputs": {
        "templateImage": {
          "type": "image",
          "label": "Template",
          "source": "..template.templateImage",  // Dot path to refer to "template" stage
          "mapTo": "17"
        },
        … 
      }
     … 
    }

So, given current stage path, we need to be able to resolve things like ..template.templateImage to something absolute like values.template.templateImage.

Let’s first build this resolver.

/**
 * Resolve relative dot path into a absolute dot path
 * Example:
 *     resolveDotPath("..template.templateImage", "generate") // 'template.templateImage'
 */
function resolveDotPath(idPath, fromPath = "") {
  const pathPart = fromPath.split(".").filter(Boolean);
  const dotPrefixes = idPath.match(/^\.+/)?.[0];
  const len = dotPrefixes?.length || 0;
  for (let i = 0; i < len - 1; i++) {
    if (pathPart.length === 0) {
      throw new Error(`Unable to resolve path: "${idPath}" from "${fromPath}"`);
    }
    pathPart.pop();
  }
  idPath = idPath.slice(len)
  pathPart.push(idPath);
  return pathPart.join(".");
}

Now, we have a path with respect to the pipeline, we can now build a getter function that will fetch the right values.

function getInputForAbsPath(absPath: string, pipeline: Pipeline): PipelineInput | undefined {
  let foundInput: PipelineInput | undefined;
  let currentStage = pipeline;
  ids = absPath.split(".");
  for (let i = 0; i < ids.length; i++) {
    const id = ids[i];
    if (i === ids.length - 1) {
      foundInput = currentStage.inputs[id];
    } else {
      currentStage = currentStage.stages[id];
    }
  }
  return foundInput
}

function getValueFromValues(absPath: string, values: Record<string, unknown>) {
  ids = absPath.split(".");
  let current = values;
  for (let i = 0; i < ids.length; i++) {
    const id = ids[i];
    current = current[id];
  }
  return current;
}

export function getValueFromPipeline(
  idPath: string, 
  fromPath: string, 
  pipeline: Pipeline,
  values: Record<string, unknown>
): unknown {
  // convert to absolute path
  const absPath = resolveDotPath(idPath, fromPath);
  const input = getInputForAbsPath(absPath, pipeline);
  if (!input) { 
    throw new Error(`Unable to find dot path ${path}.`); 
  }
  if (input.source !== undefined) {
    // Follow the source trail
    return getValueFromPipeline(input.source, fromPath, pipeline, values);
  }
  return getValueFromValues(absPath, values);
}

Now, we have the getter, let’s wire up the createTaskFromStage.

function nestedApply(dotPath: string, value: unknown, obj: unknown) {
  const ids = dotPath.split(".");
  while (ids.length > 1) {
    const id = ids.shift();
    if (!obj) throw new Error(`Unable to apply: ${dotPath}:value into ${obj}`);
    obj = obj[id];
  }
  obj[id] = value;
}

function mapInputsToWorkflow(
  inputs: Records<string, PipelineInput>, 
  workflow: Workflow, 
  getValue: (idPath) => unknown;
) {
  Object.entries(inputs).forEach(([inputId, input]) => {
    const { mapTo } = input;
    if (mapTo) {
      const value = getValue(inputId);
      nestedApply(mapTo, value, workflow);
    }
  });
}

async function comfyUIRunnerStageHandler(
  stage: ComfyStageT,
  getValue: (idPath: string) => unknown
) {
  const workflow = await fetch(stage.workflowRef).then(res => res.json());
  const { inputs } = stage;
  if (inputs) {
    mapInputsToWorkflow(inputs, workflow, getValue);
  }
  const payload =  {
    workflow,
    images: []
  };
  const outputs = await runComfy(payload);
  return outputs;
}

function createTaskFromStage({ 
  stage,
  getValue
}: {
  stage: StageT;
  getValue: (idPath: string) => unknown;
}): AnyTask {
  if (stage.type === "runner") {
    const handler = (stage.runner === "ComfyUI") 
      ? () => comfyUIRunnerStageHandler(stage, getValue)
      : () => undefined;
    return {
      message: stage.statusMessage || `Running ${stage.name}…`,
      expectedDuration: stage.expectedDuration ? stage.expectedDuration * 1000 : 10000,
      handler
    } as AnyTask;
  } else {
    return { message: "", expectedDuration: 0, handler: () => undefined } as AnyTask;
  }
}

Now, we can finally close out the createJobFromPipeline.

export function createJobFromPipeline(
  pipeline: Pipeline,
  values: Record<string, unknown>
): Job {
  const tasks: AnyTask[] = pipeline.stages?.map(stage => {
    const getValue = (idPath: string) => {
      return getValueFromPipeline(idPath, stage.id, pipeline, values);
    };
    return createTaskFromStage({ stage, getValue });
  }) || [];
  const job = { tasks };
  return job;
}

That’s it. We got stage 1 wired up. I will work on getting stage 2 wired up next.

— Sprited Dev 🌱

SpriteDX

Part 1 of 50

Tracks development of sprite generator AI tool. https://spritedx.com