summaryrefslogtreecommitdiff
path: root/worker/executor.ts
blob: faa40a6e430479c91f6feacf1dbd4ca1c8749338 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import {
  getStdout,
  type ITraceable,
  LogLevel,
  type LogMetricTraceSupplier,
  memoize,
  Metric,
  TraceUtil,
  validateExecutionEntries,
  Either,
  type IEither,
} from "@emprespresso/pengueno";
import type { Job, JobArgT, Pipeline } from "@emprespresso/ci_model";

// -- <job.exectuor> --
const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`));
export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) =>
  tJob
    .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
    .peek((tJob) =>
      tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`),
    )
    .map((tJob) =>
      validateExecutionEntries(tJob.get().arguments)
        .mapLeft((badEntries) => {
          tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString());
          return new Error("invalid job arguments");
        })
        .flatMapAsync((args) =>
          getStdout(tJob.move(tJob.get().type), { env: args }),
        ),
    )
    .peek(
      TraceUtil.promiseify((q) =>
        q.trace.trace(
          q
            .get()
            .fold(
              (err, _val) =>
                jobTypeMetric(tJob.get().type)[err ? "failure" : "success"],
            ),
        ),
      ),
    )
    .get();
// -- </job.exectuor> --

// -- <pipeline.executor> --
const pipelinesMetric = Metric.fromName("pipelines");
export const executePipeline = (
  tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
  baseEnv?: JobArgT,
): Promise<IEither<Error, void>> =>
  tPipeline
    .bimap(TraceUtil.withFunctionTrace(executePipeline))
    .bimap(TraceUtil.withMetricTrace(pipelinesMetric))
    .map(async (tJobs): Promise<IEither<Error, void>> => {
      for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
        tJobs.trace.trace(
          `executing stage ${i}. do your best little stage :>\n${serialStage}`,
        );
        const jobResults = await Promise.all(
          serialStage.parallelJobs.map((job) =>
            tJobs
              .bimap((_) => [job, `stage ${i}`])
              .map(
                (tJob) =>
                  <Job>{
                    ...tJob.get(),
                    arguments: {
                      ...baseEnv,
                      ...tJob.get().arguments,
                    },
                  },
              )
              .map(executeJob)
              .peek(
                TraceUtil.promiseify((tEitherJobOutput) =>
                  tEitherJobOutput
                    .get()
                    .mapRight((stdout) =>
                      tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout),
                    ),
                ),
              )
              .get(),
          ),
        );
        const failures = jobResults.filter((e) => e.fold((err) => !!err));
        if (failures.length > 0) {
          tJobs.trace.trace(pipelinesMetric.failure);
          return Either.left(new Error(failures.toString()));
        }
      }
      tJobs.trace.trace(pipelinesMetric.success);
      return Either.right(undefined);
    })
    .get();
// -- </pipeline.executor> --