summaryrefslogtreecommitdiff
path: root/worker/scripts/run_pipeline
blob: 99910015fcb108b3c35f33bfcbfcb76c72e500f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/usr/bin/env -S deno run --allow-env --allow-net --allow-run --allow-read --allow-write

import { type Job, PipelineImpl } from "@liz-ci/model";
import {
  getRequiredEnv,
  getStdout,
  loggerWithPrefix,
  validateIdentifier,
} from "@liz-ci/utils";

const pipelinePath = getRequiredEnv("pipeline");
const logger = loggerWithPrefix(() =>
  `[${new Date().toISOString()}] [run_pipeline.${pipelinePath}]`
);

const jobValidForExecution = (job: Job) => {
  return Object
    .entries(job.arguments)
    .filter((e) => {
      if (e.every(validateIdentifier)) return true;
      logger.error(`job of type ${job.type} has invalid args ${e}`);
      return false;
    })
    .length === 0;
};

const run = async () => {
  logger.log("starting pipeline execution");

  const stages = await (Deno.readTextFile(pipelinePath))
    .then(PipelineImpl.from)
    .then((pipeline) => pipeline.getStages());

  for (const stage of stages) {
    logger.log("executing stage", stage);

    await Promise.all(
      stage.parallelJobs.map(async (job, jobIdx) => {
        logger.log(`executing job ${jobIdx}`, job);
        if (!jobValidForExecution(job)) throw new Error("invalid job");

        const result = await getStdout(job.type, { env: job.arguments });
        logger.log(jobIdx, "outputs", { result });
      }),
    );
  }

  logger.log("ok! yay!");
};

if (import.meta.main) {
  try {
    await run();
  } catch (e) {
    logger.error("womp womp D:", e);
    throw e;
  }
}