diff options
Diffstat (limited to 'worker/scripts/run_pipeline')
-rwxr-xr-x | worker/scripts/run_pipeline | 74 |
1 files changed, 52 insertions, 22 deletions
diff --git a/worker/scripts/run_pipeline b/worker/scripts/run_pipeline index ad58573..9991001 100755 --- a/worker/scripts/run_pipeline +++ b/worker/scripts/run_pipeline @@ -1,28 +1,58 @@ -#!/usr/bin/env -S deno --allow-env --allow-net --allow-read +#!/usr/bin/env -S deno run --allow-env --allow-net --allow-run --allow-read --allow-write import { type Job, PipelineImpl } from "@liz-ci/model"; -import { getRequiredEnv, getStdout, validateIdentifier } from "@liz-ci/utils"; - -const stages = await (Deno.readTextFile(getRequiredEnv("pipeline"))) - .then(PipelineImpl.from) - .then((pipeline) => pipeline.getStages()); - -const validateJob = (job: Job) => { - Object.entries(job.arguments).forEach((e) => { - if (!e.every(validateIdentifier)) { - throw new Error(`job of type ${job.type} has invalid entry ${e}`); - } - }); +import { + getRequiredEnv, + getStdout, + loggerWithPrefix, + validateIdentifier, +} from "@liz-ci/utils"; + +const pipelinePath = getRequiredEnv("pipeline"); +const logger = loggerWithPrefix(() => + `[${new Date().toISOString()}] [run_pipeline.${pipelinePath}]` +); + +const jobValidForExecution = (job: Job) => { + return Object + .entries(job.arguments) + .filter((e) => { + if (e.every(validateIdentifier)) return true; + logger.error(`job of type ${job.type} has invalid args ${e}`); + return false; + }) + .length === 0; }; -for (const stage of stages) { - await Promise.all( - stage.parallelJobs.map((job) => { - validateJob(job); +const run = async () => { + logger.log("starting pipeline execution"); + + const stages = await (Deno.readTextFile(pipelinePath)) + .then(PipelineImpl.from) + .then((pipeline) => pipeline.getStages()); + + for (const stage of stages) { + logger.log("executing stage", stage); + + await Promise.all( + stage.parallelJobs.map(async (job, jobIdx) => { + logger.log(`executing job ${jobIdx}`, job); + if (!jobValidForExecution(job)) throw new Error("invalid job"); + + const result = await getStdout(job.type, { env: job.arguments }); + logger.log(jobIdx, "outputs", { result }); + }), + ); + } + + logger.log("ok! yay!"); +}; - return getStdout(job.type, { - env: job.arguments, - }); - }), - ); +if (import.meta.main) { + try { + await run(); + } catch (e) { + logger.error("womp womp D:", e); + throw e; + } } |