diff options
Diffstat (limited to 'worker/executor')
-rw-r--r-- | worker/executor/job.ts | 42 | ||||
-rw-r--r-- | worker/executor/mod.ts | 2 | ||||
-rw-r--r-- | worker/executor/pipeline.ts | 53 |
3 files changed, 97 insertions, 0 deletions
diff --git a/worker/executor/job.ts b/worker/executor/job.ts new file mode 100644 index 0000000..76f0e0c --- /dev/null +++ b/worker/executor/job.ts @@ -0,0 +1,42 @@ +import { + getStdout, + type ITraceable, + LogLevel, + type LogMetricTraceSupplier, + memoize, + Metric, + TraceUtil, + validateExecutionEntries, +} from "@emprespresso/pengueno"; +import type { Job } from "@emprespresso/ci-model"; + +const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`)); +export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) => + tJob.bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type))) + .peek((tJob) => + tJob.trace.trace( + `let's do this little job ok!! ${tJob.get()}`, + ) + ) + .map((tJob) => + validateExecutionEntries(tJob.get().arguments) + .mapLeft((badEntries) => { + tJob.trace.addTrace(LogLevel.ERROR).trace( + badEntries.toString(), + ); + return new Error("invalid job arguments"); + }) + .flatMapAsync((args) => + getStdout(tJob.move(tJob.get().type), { env: args }) + ) + ) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace( + q.get().fold((err, _val) => + jobTypeMetric(tJob.get().type)[err ? "failure" : "success"] + ), + ) + ), + ) + .get(); diff --git a/worker/executor/mod.ts b/worker/executor/mod.ts new file mode 100644 index 0000000..944ab7d --- /dev/null +++ b/worker/executor/mod.ts @@ -0,0 +1,2 @@ +export * from "./job.ts"; +export * from "./pipeline.ts"; diff --git a/worker/executor/pipeline.ts b/worker/executor/pipeline.ts new file mode 100644 index 0000000..a1aa7c3 --- /dev/null +++ b/worker/executor/pipeline.ts @@ -0,0 +1,53 @@ +import { + Either, + type IEither, + type ITraceable, + type LogMetricTraceSupplier, + Metric, + TraceUtil, +} from "@emprespresso/pengueno"; +import type { Job, JobArgT, Pipeline } from "@emprespresso/ci-model"; +import { executeJob } from "./mod.ts"; + +const pipelinesMetric = Metric.fromName("pipelines"); +export const executePipeline = ( + tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>, + baseEnv?: JobArgT, +): Promise<IEither<Error, void>> => + tPipeline.bimap(TraceUtil.withFunctionTrace(executePipeline)) + .bimap(TraceUtil.withMetricTrace(pipelinesMetric)) + .map(async (tJobs): Promise<IEither<Error, void>> => { + for (const [i, serialStage] of tJobs.get().serialJobs.entries()) { + tJobs.trace.trace( + `executing stage ${i}. do your best little stage :>\n${serialStage}`, + ); + const jobResults = await Promise.all( + serialStage.parallelJobs.map((job) => + tJobs.bimap((_) => [job, `stage ${i}`]) + .map((tJob) => + <Job> ({ + ...tJob.get(), + arguments: { ...baseEnv, ...tJob.get().arguments }, + }) + ) + .map(executeJob) + .peek( + TraceUtil.promiseify((tEitherJobOutput) => + tEitherJobOutput.get().mapRight((stdout) => + tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout) + ) + ), + ) + .get() + ), + ); + const failures = jobResults.filter((e) => e.fold((err) => !!err)); + if (failures.length > 0) { + tJobs.trace.trace(pipelinesMetric.failure); + return Either.left(new Error(failures.toString())); + } + } + tJobs.trace.trace(pipelinesMetric.success); + return Either.right(undefined); + }) + .get(); |