diff options
Diffstat (limited to 'worker/executor/pipeline.ts')
-rw-r--r-- | worker/executor/pipeline.ts | 53 |
1 files changed, 53 insertions, 0 deletions
diff --git a/worker/executor/pipeline.ts b/worker/executor/pipeline.ts new file mode 100644 index 0000000..a1aa7c3 --- /dev/null +++ b/worker/executor/pipeline.ts @@ -0,0 +1,53 @@ +import { + Either, + type IEither, + type ITraceable, + type LogMetricTraceSupplier, + Metric, + TraceUtil, +} from "@emprespresso/pengueno"; +import type { Job, JobArgT, Pipeline } from "@emprespresso/ci-model"; +import { executeJob } from "./mod.ts"; + +const pipelinesMetric = Metric.fromName("pipelines"); +export const executePipeline = ( + tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>, + baseEnv?: JobArgT, +): Promise<IEither<Error, void>> => + tPipeline.bimap(TraceUtil.withFunctionTrace(executePipeline)) + .bimap(TraceUtil.withMetricTrace(pipelinesMetric)) + .map(async (tJobs): Promise<IEither<Error, void>> => { + for (const [i, serialStage] of tJobs.get().serialJobs.entries()) { + tJobs.trace.trace( + `executing stage ${i}. do your best little stage :>\n${serialStage}`, + ); + const jobResults = await Promise.all( + serialStage.parallelJobs.map((job) => + tJobs.bimap((_) => [job, `stage ${i}`]) + .map((tJob) => + <Job> ({ + ...tJob.get(), + arguments: { ...baseEnv, ...tJob.get().arguments }, + }) + ) + .map(executeJob) + .peek( + TraceUtil.promiseify((tEitherJobOutput) => + tEitherJobOutput.get().mapRight((stdout) => + tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout) + ) + ), + ) + .get() + ), + ); + const failures = jobResults.filter((e) => e.fold((err) => !!err)); + if (failures.length > 0) { + tJobs.trace.trace(pipelinesMetric.failure); + return Either.left(new Error(failures.toString())); + } + } + tJobs.trace.trace(pipelinesMetric.success); + return Either.right(undefined); + }) + .get(); |