summaryrefslogtreecommitdiff
path: root/worker/executor/pipeline.ts
diff options
context:
space:
mode:
Diffstat (limited to 'worker/executor/pipeline.ts')
-rw-r--r--worker/executor/pipeline.ts53
1 files changed, 53 insertions, 0 deletions
diff --git a/worker/executor/pipeline.ts b/worker/executor/pipeline.ts
new file mode 100644
index 0000000..a1aa7c3
--- /dev/null
+++ b/worker/executor/pipeline.ts
@@ -0,0 +1,53 @@
+import {
+ Either,
+ type IEither,
+ type ITraceable,
+ type LogMetricTraceSupplier,
+ Metric,
+ TraceUtil,
+} from "@emprespresso/pengueno";
+import type { Job, JobArgT, Pipeline } from "@emprespresso/ci-model";
+import { executeJob } from "./mod.ts";
+
+const pipelinesMetric = Metric.fromName("pipelines");
+export const executePipeline = (
+ tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
+ baseEnv?: JobArgT,
+): Promise<IEither<Error, void>> =>
+ tPipeline.bimap(TraceUtil.withFunctionTrace(executePipeline))
+ .bimap(TraceUtil.withMetricTrace(pipelinesMetric))
+ .map(async (tJobs): Promise<IEither<Error, void>> => {
+ for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
+ tJobs.trace.trace(
+ `executing stage ${i}. do your best little stage :>\n${serialStage}`,
+ );
+ const jobResults = await Promise.all(
+ serialStage.parallelJobs.map((job) =>
+ tJobs.bimap((_) => [job, `stage ${i}`])
+ .map((tJob) =>
+ <Job> ({
+ ...tJob.get(),
+ arguments: { ...baseEnv, ...tJob.get().arguments },
+ })
+ )
+ .map(executeJob)
+ .peek(
+ TraceUtil.promiseify((tEitherJobOutput) =>
+ tEitherJobOutput.get().mapRight((stdout) =>
+ tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout)
+ )
+ ),
+ )
+ .get()
+ ),
+ );
+ const failures = jobResults.filter((e) => e.fold((err) => !!err));
+ if (failures.length > 0) {
+ tJobs.trace.trace(pipelinesMetric.failure);
+ return Either.left(new Error(failures.toString()));
+ }
+ }
+ tJobs.trace.trace(pipelinesMetric.success);
+ return Either.right(undefined);
+ })
+ .get();