diff options
author | Elizabeth <me@liz.coffee> | 2025-06-02 11:14:52 -0700 |
---|---|---|
committer | Elizabeth <me@liz.coffee> | 2025-06-02 11:14:52 -0700 |
commit | 373d9ec700c0097a22cf665a8e33cf48998d1dc2 (patch) | |
tree | 71297ac69177037929e1bfb00b8c71038058acd5 /worker/executor.ts | |
parent | 646c5eb11d3b9240f8434163d103a117d30c88c7 (diff) | |
download | ci-373d9ec700c0097a22cf665a8e33cf48998d1dc2.tar.gz ci-373d9ec700c0097a22cf665a8e33cf48998d1dc2.zip |
Minor things
Diffstat (limited to 'worker/executor.ts')
-rw-r--r-- | worker/executor.ts | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/worker/executor.ts b/worker/executor.ts new file mode 100644 index 0000000..faa40a6 --- /dev/null +++ b/worker/executor.ts @@ -0,0 +1,99 @@ +import { + getStdout, + type ITraceable, + LogLevel, + type LogMetricTraceSupplier, + memoize, + Metric, + TraceUtil, + validateExecutionEntries, + Either, + type IEither, +} from "@emprespresso/pengueno"; +import type { Job, JobArgT, Pipeline } from "@emprespresso/ci_model"; + +// -- <job.exectuor> -- +const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`)); +export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) => + tJob + .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type))) + .peek((tJob) => + tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`), + ) + .map((tJob) => + validateExecutionEntries(tJob.get().arguments) + .mapLeft((badEntries) => { + tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString()); + return new Error("invalid job arguments"); + }) + .flatMapAsync((args) => + getStdout(tJob.move(tJob.get().type), { env: args }), + ), + ) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace( + q + .get() + .fold( + (err, _val) => + jobTypeMetric(tJob.get().type)[err ? "failure" : "success"], + ), + ), + ), + ) + .get(); +// -- </job.exectuor> -- + +// -- <pipeline.executor> -- +const pipelinesMetric = Metric.fromName("pipelines"); +export const executePipeline = ( + tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>, + baseEnv?: JobArgT, +): Promise<IEither<Error, void>> => + tPipeline + .bimap(TraceUtil.withFunctionTrace(executePipeline)) + .bimap(TraceUtil.withMetricTrace(pipelinesMetric)) + .map(async (tJobs): Promise<IEither<Error, void>> => { + for (const [i, serialStage] of tJobs.get().serialJobs.entries()) { + tJobs.trace.trace( + `executing stage ${i}. do your best little stage :>\n${serialStage}`, + ); + const jobResults = await Promise.all( + serialStage.parallelJobs.map((job) => + tJobs + .bimap((_) => [job, `stage ${i}`]) + .map( + (tJob) => + <Job>{ + ...tJob.get(), + arguments: { + ...baseEnv, + ...tJob.get().arguments, + }, + }, + ) + .map(executeJob) + .peek( + TraceUtil.promiseify((tEitherJobOutput) => + tEitherJobOutput + .get() + .mapRight((stdout) => + tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout), + ), + ), + ) + .get(), + ), + ); + const failures = jobResults.filter((e) => e.fold((err) => !!err)); + if (failures.length > 0) { + tJobs.trace.trace(pipelinesMetric.failure); + return Either.left(new Error(failures.toString())); + } + } + tJobs.trace.trace(pipelinesMetric.success); + return Either.right(undefined); + }) + .get(); +// -- </pipeline.executor> -- |