SpriteDX - Pipeline Runner

This week has been really really draining. Two and half days were lost due to course work, and I want to try to build up a momentum again today, but little low on energy.
So, we json-ified our pipeline and laid out the Parameter Board with the inputs. Now, we need to wire up an a runner to actually run the pipeline once “Generate“ button is clicked.
What it looks like right now:
const job = createJob({
tasks: [{
message: "Generating character…",
expectedDuration: 60 * 1000,
handler: async () => {
const [characterResult, characterOutput] = await generateCharacterCached({
prompt: "...",
dataUrl: await urlToDataUrl(templateImageUrl),
templateRows,
templateCols,
seed: 42,
guidance: 60,
});
await fileStore.storeComfyOutput(characterOutput, intermediatesDir);
return characterResult;
}
}, {
message: "Animating character…",
expectedDuration: 60 * 1000,
handler: async (characterResult: WorkflowOutputImage) => {
const [animationResult, animationOutput] = await animateCharacterCached({
dataUrl: outputToDataURL(characterResult)
});
await fileStore.storeComfyOutput(animationOutput, intermediatesDir);
return animationResult;
}
}, {
message: "Cutting shots…",
expectedDuration: 15 * 1000,
handler: async (animationResult: WorkflowOutputImage) => {
const [cutResult, cutOutput] = await cutShotsCached({ dataUrl: outputToDataURL(animationResult) });
await fileStore.storeComfyOutput(cutOutput, intermediatesDir);
console.log("Cut result:", cutResult);
return cutResult;
}
}, {
message: "Postprocessing…",
expectedDuration: 15 * 1000,
handler: async (cutResult: WorkflowOutputImage[]) => {
for (let i = 0; i < cutResult.length; i++) {
const cut = cutResult[i];
const [postProcessResult, postProcessOutput] = await postProcess({ dataUrl: outputToDataURL(cut), key: i });
await fileStore.storeComfyOutput(postProcessOutput, intermediatesDir);
// Save final result
await fileStore.storeComfyOutput({ images: [postProcessResult] }, dir);
}
}
}]
});
which are basically calling the hardcoded functions.
Let’s make them dynamic based on the pipeline definition.
Assumptions
Let’s make some simplifications.
- Pipelines only have single level of stages.
Implementation Plan
Pipeline is JSON-ified and is static definition. So we can make a function that will take in the pipeline definition and produce set of tasks. Let’s define a function to do exactly that. It takes in the pipeline and hash of values.
export function createJobFromPipeline(
pipeline: Pipeline,
values: Record<string, unknown>
): Job {
return {
tasks: pipeline.stages.map(stage => createTaskFromStage({ stage, … }));
return { tasks } as Job;
}
Let’s design createTaskFromStage.
Inside each stage, we use dot paths to refer to previous stages. For example:
// stage example
{
"id": "generate",
"name": "Prompt",
"type": "runner",
"statusMessage": "Generating Reference…",
"runner": "ComfyUI",
"workflowRef": "path/to/workflow.json",
"inputs": {
"templateImage": {
"type": "image",
"label": "Template",
"source": "..template.templateImage", // Dot path to refer to "template" stage
"mapTo": "17"
},
…
}
…
}
So, given current stage path, we need to be able to resolve things like ..template.templateImage to something absolute like values.template.templateImage.
Let’s first build this resolver.
/**
* Resolve relative dot path into a absolute dot path
* Example:
* resolveDotPath("..template.templateImage", "generate") // 'template.templateImage'
*/
function resolveDotPath(idPath, fromPath = "") {
const pathPart = fromPath.split(".").filter(Boolean);
const dotPrefixes = idPath.match(/^\.+/)?.[0];
const len = dotPrefixes?.length || 0;
for (let i = 0; i < len - 1; i++) {
if (pathPart.length === 0) {
throw new Error(`Unable to resolve path: "${idPath}" from "${fromPath}"`);
}
pathPart.pop();
}
idPath = idPath.slice(len)
pathPart.push(idPath);
return pathPart.join(".");
}
Now, we have a path with respect to the pipeline, we can now build a getter function that will fetch the right values.
function getInputForAbsPath(absPath: string, pipeline: Pipeline): PipelineInput | undefined {
let foundInput: PipelineInput | undefined;
let currentStage = pipeline;
ids = absPath.split(".");
for (let i = 0; i < ids.length; i++) {
const id = ids[i];
if (i === ids.length - 1) {
foundInput = currentStage.inputs[id];
} else {
currentStage = currentStage.stages[id];
}
}
return foundInput
}
function getValueFromValues(absPath: string, values: Record<string, unknown>) {
ids = absPath.split(".");
let current = values;
for (let i = 0; i < ids.length; i++) {
const id = ids[i];
current = current[id];
}
return current;
}
export function getValueFromPipeline(
idPath: string,
fromPath: string,
pipeline: Pipeline,
values: Record<string, unknown>
): unknown {
// convert to absolute path
const absPath = resolveDotPath(idPath, fromPath);
const input = getInputForAbsPath(absPath, pipeline);
if (!input) {
throw new Error(`Unable to find dot path ${path}.`);
}
if (input.source !== undefined) {
// Follow the source trail
return getValueFromPipeline(input.source, fromPath, pipeline, values);
}
return getValueFromValues(absPath, values);
}
Now, we have the getter, let’s wire up the createTaskFromStage.
function nestedApply(dotPath: string, value: unknown, obj: unknown) {
const ids = dotPath.split(".");
while (ids.length > 1) {
const id = ids.shift();
if (!obj) throw new Error(`Unable to apply: ${dotPath}:value into ${obj}`);
obj = obj[id];
}
obj[id] = value;
}
function mapInputsToWorkflow(
inputs: Records<string, PipelineInput>,
workflow: Workflow,
getValue: (idPath) => unknown;
) {
Object.entries(inputs).forEach(([inputId, input]) => {
const { mapTo } = input;
if (mapTo) {
const value = getValue(inputId);
nestedApply(mapTo, value, workflow);
}
});
}
async function comfyUIRunnerStageHandler(
stage: ComfyStageT,
getValue: (idPath: string) => unknown
) {
const workflow = await fetch(stage.workflowRef).then(res => res.json());
const { inputs } = stage;
if (inputs) {
mapInputsToWorkflow(inputs, workflow, getValue);
}
const payload = {
workflow,
images: []
};
const outputs = await runComfy(payload);
return outputs;
}
function createTaskFromStage({
stage,
getValue
}: {
stage: StageT;
getValue: (idPath: string) => unknown;
}): AnyTask {
if (stage.type === "runner") {
const handler = (stage.runner === "ComfyUI")
? () => comfyUIRunnerStageHandler(stage, getValue)
: () => undefined;
return {
message: stage.statusMessage || `Running ${stage.name}…`,
expectedDuration: stage.expectedDuration ? stage.expectedDuration * 1000 : 10000,
handler
} as AnyTask;
} else {
return { message: "", expectedDuration: 0, handler: () => undefined } as AnyTask;
}
}
Now, we can finally close out the createJobFromPipeline.
export function createJobFromPipeline(
pipeline: Pipeline,
values: Record<string, unknown>
): Job {
const tasks: AnyTask[] = pipeline.stages?.map(stage => {
const getValue = (idPath: string) => {
return getValueFromPipeline(idPath, stage.id, pipeline, values);
};
return createTaskFromStage({ stage, getValue });
}) || [];
const job = { tasks };
return job;
}
That’s it. We got stage 1 wired up. I will work on getting stage 2 wired up next.
— Sprited Dev 🌱



