summaryrefslogtreecommitdiff
path: root/worker/scripts/run_pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'worker/scripts/run_pipeline')
-rwxr-xr-xworker/scripts/run_pipeline74
1 files changed, 52 insertions, 22 deletions
diff --git a/worker/scripts/run_pipeline b/worker/scripts/run_pipeline
index ad58573..9991001 100755
--- a/worker/scripts/run_pipeline
+++ b/worker/scripts/run_pipeline
@@ -1,28 +1,58 @@
-#!/usr/bin/env -S deno --allow-env --allow-net --allow-read
+#!/usr/bin/env -S deno run --allow-env --allow-net --allow-run --allow-read --allow-write
import { type Job, PipelineImpl } from "@liz-ci/model";
-import { getRequiredEnv, getStdout, validateIdentifier } from "@liz-ci/utils";
-
-const stages = await (Deno.readTextFile(getRequiredEnv("pipeline")))
- .then(PipelineImpl.from)
- .then((pipeline) => pipeline.getStages());
-
-const validateJob = (job: Job) => {
- Object.entries(job.arguments).forEach((e) => {
- if (!e.every(validateIdentifier)) {
- throw new Error(`job of type ${job.type} has invalid entry ${e}`);
- }
- });
+import {
+ getRequiredEnv,
+ getStdout,
+ loggerWithPrefix,
+ validateIdentifier,
+} from "@liz-ci/utils";
+
+const pipelinePath = getRequiredEnv("pipeline");
+const logger = loggerWithPrefix(() =>
+ `[${new Date().toISOString()}] [run_pipeline.${pipelinePath}]`
+);
+
+const jobValidForExecution = (job: Job) => {
+ return Object
+ .entries(job.arguments)
+ .filter((e) => {
+ if (e.every(validateIdentifier)) return true;
+ logger.error(`job of type ${job.type} has invalid args ${e}`);
+ return false;
+ })
+ .length === 0;
};
-for (const stage of stages) {
- await Promise.all(
- stage.parallelJobs.map((job) => {
- validateJob(job);
+const run = async () => {
+ logger.log("starting pipeline execution");
+
+ const stages = await (Deno.readTextFile(pipelinePath))
+ .then(PipelineImpl.from)
+ .then((pipeline) => pipeline.getStages());
+
+ for (const stage of stages) {
+ logger.log("executing stage", stage);
+
+ await Promise.all(
+ stage.parallelJobs.map(async (job, jobIdx) => {
+ logger.log(`executing job ${jobIdx}`, job);
+ if (!jobValidForExecution(job)) throw new Error("invalid job");
+
+ const result = await getStdout(job.type, { env: job.arguments });
+ logger.log(jobIdx, "outputs", { result });
+ }),
+ );
+ }
+
+ logger.log("ok! yay!");
+};
- return getStdout(job.type, {
- env: job.arguments,
- });
- }),
- );
+if (import.meta.main) {
+ try {
+ await run();
+ } catch (e) {
+ logger.error("womp womp D:", e);
+ throw e;
+ }
}