import { getStdout, type ITraceable, LogLevel, type LogMetricTraceSupplier, memoize, Metric, TraceUtil, validateExecutionEntries, Either, type IEither, } from '@emprespresso/pengueno'; import type { Job, JobArgT, Pipeline } from '@emprespresso/ci_model'; // -- -- const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`).asResult()); export const executeJob = (tJob: ITraceable) => { const metric = jobTypeMetric(tJob.get().type); return tJob .flatMap(TraceUtil.withMetricTrace(metric)) .peek((tJob) => tJob.trace.trace(`let's do this little job ok!! ${JSON.stringify(tJob.get())}`)) .map((tJob) => validateExecutionEntries(tJob.get().arguments) .mapLeft((badEntries) => { tJob.trace.traceScope(LogLevel.ERROR).trace(badEntries.toString()); return new Error('invalid job arguments'); }) .flatMapAsync((args) => getStdout(tJob.move(tJob.get().type), { env: args })), ) .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric))) .peek(TraceUtil.promiseify(t => t.traceScope(() => LogLevel.DEBUG).trace.trace(JSON.stringify(t.get())))) .get(); }; // -- -- // -- -- const pipelinesMetric = Metric.fromName('pipelines').asResult(); export const executePipeline = ( tPipeline: ITraceable, baseEnv?: JobArgT, ): Promise> => tPipeline .flatMap(TraceUtil.withFunctionTrace(executePipeline)) .flatMap(TraceUtil.withMetricTrace(pipelinesMetric)) .map(async (_tPipeline): Promise> => { for (const [i, serialStage] of tPipeline.get().serialJobs.entries()) { const tPipeline = _tPipeline.flatMap(TraceUtil.withTrace(`Stage = ${i}`)); const parallelJobs = tPipeline .peek((t) => t.trace.trace(`do your best little stage :> ${serialStage}`)) .move(serialStage.parallelJobs) .coExtend((jobs) => jobs.get().map((job) => { ...job, arguments: { ...baseEnv, ...job.arguments } }), ) .map((job) => { const metric = jobTypeMetric(job.get().type); return job .flatMap(TraceUtil.withMetricTrace(metric)) .map(executeJob) .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric))); }); const results = await Promise.all(parallelJobs.map((job) => job.get())); const failures = results.filter((e) => e.left); if (failures.length > 0) { return Either.left(new Error(failures.toString())); } } return Either.right(undefined); }) .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(pipelinesMetric))) .get(); // -- --