diff options
Diffstat (limited to 'worker/executor.ts')
-rw-r--r-- | worker/executor.ts | 76 |
1 files changed, 30 insertions, 46 deletions
diff --git a/worker/executor.ts b/worker/executor.ts index f4b7906..bfcbc37 100644 --- a/worker/executor.ts +++ b/worker/executor.ts @@ -13,74 +13,58 @@ import { import type { Job, JobArgT, Pipeline } from '@emprespresso/ci_model'; // -- <job.exectuor> -- -const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`)); -export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) => - tJob - .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type))) +const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`).asResult()); +export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) => { + const metric = jobTypeMetric(tJob.get().type); + return tJob + .flatMap(TraceUtil.withMetricTrace(metric)) .peek((tJob) => tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`)) .map((tJob) => validateExecutionEntries(tJob.get().arguments) .mapLeft((badEntries) => { - tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString()); + tJob.trace.traceScope(LogLevel.ERROR).trace(badEntries.toString()); return new Error('invalid job arguments'); }) .flatMapAsync((args) => getStdout(tJob.move(tJob.get().type), { env: args })), ) - .peek( - TraceUtil.promiseify((q) => - q.trace.trace( - q.get().fold(({ isLeft }) => jobTypeMetric(tJob.get().type)[isLeft ? 'failure' : 'success']), - ), - ), - ) + .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric))) .get(); +}; // -- </job.exectuor> -- // -- <pipeline.executor> -- -const pipelinesMetric = Metric.fromName('pipelines'); +const pipelinesMetric = Metric.fromName('pipelines').asResult(); export const executePipeline = ( tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>, baseEnv?: JobArgT, ): Promise<IEither<Error, void>> => tPipeline - .bimap(TraceUtil.withFunctionTrace(executePipeline)) - .bimap(TraceUtil.withMetricTrace(pipelinesMetric)) - .map(async (tJobs): Promise<IEither<Error, void>> => { - for (const [i, serialStage] of tJobs.get().serialJobs.entries()) { - tJobs.trace.trace(`executing stage ${i}. do your best little stage :>\n${serialStage}`); - const jobResults = await Promise.all( - serialStage.parallelJobs.map((job) => - tJobs - .bimap((_) => [job, `stage ${i}`]) - .map( - (tJob) => - <Job>{ - ...tJob.get(), - arguments: { - ...baseEnv, - ...tJob.get().arguments, - }, - }, - ) + .flatMap(TraceUtil.withFunctionTrace(executePipeline)) + .flatMap(TraceUtil.withMetricTrace(pipelinesMetric)) + .map(async (_tPipeline): Promise<IEither<Error, void>> => { + for (const [i, serialStage] of tPipeline.get().serialJobs.entries()) { + const tPipeline = _tPipeline.flatMap(TraceUtil.withTrace(`Stage = ${i}`)); + const parallelJobs = tPipeline + .peek((t) => t.trace.trace(`do your best little stage :> ${serialStage}`)) + .move(serialStage.parallelJobs) + .coExtend((jobs) => + jobs.get().map((job) => <Job>{ ...job, arguments: { ...baseEnv, ...job.arguments } }), + ) + .map((job) => { + const metric = jobTypeMetric(job.get().type); + return job + .flatMap(TraceUtil.withMetricTrace(metric)) .map(executeJob) - .peek( - TraceUtil.promiseify((tEitherJobOutput) => - tEitherJobOutput - .get() - .mapRight((stdout) => tEitherJobOutput.trace.addTrace('STDOUT').trace(stdout)), - ), - ) - .get(), - ), - ); - const failures = jobResults.filter((e) => e.fold(({ isLeft }) => isLeft)); + .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric))); + }); + const results = await Promise.all(parallelJobs.map((job) => job.get())); + const failures = results.filter((e) => e.left); if (failures.length > 0) { - tJobs.trace.trace(pipelinesMetric.failure); return Either.left(new Error(failures.toString())); } } - tJobs.trace.trace(pipelinesMetric.success); - return Either.right(undefined); + return Either.right(<void>undefined); }) + .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(pipelinesMetric))) .get(); // -- </pipeline.executor> -- |