# SpriteDX - Pipeline Runner

This week has been really really draining. Two and half days were lost due to course work, and I want to try to build up a momentum again today, but little low on energy.

So, we json-ified our pipeline and laid out the Parameter Board with the inputs. Now, we need to wire up an a runner to actually run the pipeline once “Generate“ button is clicked.

What it looks like right now:

```typescript
    const job = createJob({
      tasks: [{
        message: "Generating character…",
        expectedDuration: 60 * 1000,
        handler: async () => {
          const [characterResult, characterOutput] = await generateCharacterCached({
            prompt: "...",
            dataUrl: await urlToDataUrl(templateImageUrl),
            templateRows,
            templateCols,
            seed: 42,
            guidance: 60,
          });
          await fileStore.storeComfyOutput(characterOutput, intermediatesDir);
          return characterResult;
        }
      }, {
        message: "Animating character…",
        expectedDuration: 60 * 1000,
        handler: async (characterResult: WorkflowOutputImage) => {
          const [animationResult, animationOutput] = await animateCharacterCached({
            dataUrl: outputToDataURL(characterResult)
          });
          await fileStore.storeComfyOutput(animationOutput, intermediatesDir);
          return animationResult;
        }
      }, {
        message: "Cutting shots…",
        expectedDuration: 15 * 1000,
        handler: async (animationResult: WorkflowOutputImage) => {
          const [cutResult, cutOutput] = await cutShotsCached({ dataUrl: outputToDataURL(animationResult) });
          await fileStore.storeComfyOutput(cutOutput, intermediatesDir);
          console.log("Cut result:", cutResult);
          return cutResult;
        }
      }, {
        message: "Postprocessing…",
        expectedDuration: 15 * 1000,
        handler: async (cutResult: WorkflowOutputImage[]) => {
          for (let i = 0; i < cutResult.length; i++) {
            const cut = cutResult[i];
            const [postProcessResult, postProcessOutput] = await postProcess({ dataUrl: outputToDataURL(cut), key: i });
            await fileStore.storeComfyOutput(postProcessOutput, intermediatesDir);
            // Save final result
            await fileStore.storeComfyOutput({ images: [postProcessResult] }, dir);
          }
        }
      }]
    });
```

which are basically calling the hardcoded functions.

Let’s make them dynamic based on the pipeline definition.

## Assumptions

Let’s make some simplifications.

1. Pipelines only have single level of stages.
    

## Implementation Plan

Pipeline is JSON-ified and is static definition. So we can make a function that will take in the pipeline definition and produce set of tasks. Let’s define a function to do exactly that. It takes in the pipeline and hash of values.

```typescript
export function createJobFromPipeline(
  pipeline: Pipeline, 
  values: Record<string, unknown>
): Job {
  return { 
    tasks: pipeline.stages.map(stage => createTaskFromStage({ stage, … }));
  return { tasks } as Job;
}
```

Let’s design `createTaskFromStage`.

Inside each stage, we use dot paths to refer to previous stages. For example:

```typescript
    // stage example
    {
      "id": "generate",
      "name": "Prompt",
      "type": "runner",
      "statusMessage": "Generating Reference…",
      "runner": "ComfyUI",
      "workflowRef": "path/to/workflow.json",
      "inputs": {
        "templateImage": {
          "type": "image",
          "label": "Template",
          "source": "..template.templateImage",  // Dot path to refer to "template" stage
          "mapTo": "17"
        },
        … 
      }
     … 
    }
```

So, given current stage path, we need to be able to resolve things like `..template.templateImage` to something absolute like `values.template.templateImage`.

Let’s first build this resolver.

```typescript
/**
 * Resolve relative dot path into a absolute dot path
 * Example:
 *     resolveDotPath("..template.templateImage", "generate") // 'template.templateImage'
 */
function resolveDotPath(idPath, fromPath = "") {
  const pathPart = fromPath.split(".").filter(Boolean);
  const dotPrefixes = idPath.match(/^\.+/)?.[0];
  const len = dotPrefixes?.length || 0;
  for (let i = 0; i < len - 1; i++) {
    if (pathPart.length === 0) {
      throw new Error(`Unable to resolve path: "${idPath}" from "${fromPath}"`);
    }
    pathPart.pop();
  }
  idPath = idPath.slice(len)
  pathPart.push(idPath);
  return pathPart.join(".");
}
```

Now, we have a path with respect to the pipeline, we can now build a getter function that will fetch the right values.

```typescript
function getInputForAbsPath(absPath: string, pipeline: Pipeline): PipelineInput | undefined {
  let foundInput: PipelineInput | undefined;
  let currentStage = pipeline;
  ids = absPath.split(".");
  for (let i = 0; i < ids.length; i++) {
    const id = ids[i];
    if (i === ids.length - 1) {
      foundInput = currentStage.inputs[id];
    } else {
      currentStage = currentStage.stages[id];
    }
  }
  return foundInput
}

function getValueFromValues(absPath: string, values: Record<string, unknown>) {
  ids = absPath.split(".");
  let current = values;
  for (let i = 0; i < ids.length; i++) {
    const id = ids[i];
    current = current[id];
  }
  return current;
}

export function getValueFromPipeline(
  idPath: string, 
  fromPath: string, 
  pipeline: Pipeline,
  values: Record<string, unknown>
): unknown {
  // convert to absolute path
  const absPath = resolveDotPath(idPath, fromPath);
  const input = getInputForAbsPath(absPath, pipeline);
  if (!input) { 
    throw new Error(`Unable to find dot path ${path}.`); 
  }
  if (input.source !== undefined) {
    // Follow the source trail
    return getValueFromPipeline(input.source, fromPath, pipeline, values);
  }
  return getValueFromValues(absPath, values);
}
```

Now, we have the getter, let’s wire up the `createTaskFromStage`.

```typescript
function nestedApply(dotPath: string, value: unknown, obj: unknown) {
  const ids = dotPath.split(".");
  while (ids.length > 1) {
    const id = ids.shift();
    if (!obj) throw new Error(`Unable to apply: ${dotPath}:value into ${obj}`);
    obj = obj[id];
  }
  obj[id] = value;
}

function mapInputsToWorkflow(
  inputs: Records<string, PipelineInput>, 
  workflow: Workflow, 
  getValue: (idPath) => unknown;
) {
  Object.entries(inputs).forEach(([inputId, input]) => {
    const { mapTo } = input;
    if (mapTo) {
      const value = getValue(inputId);
      nestedApply(mapTo, value, workflow);
    }
  });
}

async function comfyUIRunnerStageHandler(
  stage: ComfyStageT,
  getValue: (idPath: string) => unknown
) {
  const workflow = await fetch(stage.workflowRef).then(res => res.json());
  const { inputs } = stage;
  if (inputs) {
    mapInputsToWorkflow(inputs, workflow, getValue);
  }
  const payload =  {
    workflow,
    images: []
  };
  const outputs = await runComfy(payload);
  return outputs;
}

function createTaskFromStage({ 
  stage,
  getValue
}: {
  stage: StageT;
  getValue: (idPath: string) => unknown;
}): AnyTask {
  if (stage.type === "runner") {
    const handler = (stage.runner === "ComfyUI") 
      ? () => comfyUIRunnerStageHandler(stage, getValue)
      : () => undefined;
    return {
      message: stage.statusMessage || `Running ${stage.name}…`,
      expectedDuration: stage.expectedDuration ? stage.expectedDuration * 1000 : 10000,
      handler
    } as AnyTask;
  } else {
    return { message: "", expectedDuration: 0, handler: () => undefined } as AnyTask;
  }
}
```

Now, we can finally close out the `createJobFromPipeline`.

```typescript
export function createJobFromPipeline(
  pipeline: Pipeline,
  values: Record<string, unknown>
): Job {
  const tasks: AnyTask[] = pipeline.stages?.map(stage => {
    const getValue = (idPath: string) => {
      return getValueFromPipeline(idPath, stage.id, pipeline, values);
    };
    return createTaskFromStage({ stage, getValue });
  }) || [];
  const job = { tasks };
  return job;
}
```

---

That’s it. We got stage 1 wired up. I will work on getting stage 2 wired up next.

— Sprited Dev 🌱
