diff options
Diffstat (limited to 'worker/executor.ts')
-rw-r--r-- | worker/executor.ts | 161 |
1 files changed, 73 insertions, 88 deletions
diff --git a/worker/executor.ts b/worker/executor.ts index ea79995..f4b7906 100644 --- a/worker/executor.ts +++ b/worker/executor.ts @@ -1,101 +1,86 @@ import { - getStdout, - type ITraceable, - LogLevel, - type LogMetricTraceSupplier, - memoize, - Metric, - TraceUtil, - validateExecutionEntries, - Either, - type IEither, -} from "@emprespresso/pengueno"; -import type { Job, JobArgT, Pipeline } from "@emprespresso/ci_model"; + getStdout, + type ITraceable, + LogLevel, + type LogMetricTraceSupplier, + memoize, + Metric, + TraceUtil, + validateExecutionEntries, + Either, + type IEither, +} from '@emprespresso/pengueno'; +import type { Job, JobArgT, Pipeline } from '@emprespresso/ci_model'; // -- <job.exectuor> -- const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`)); export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) => - tJob - .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type))) - .peek((tJob) => - tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`), - ) - .map((tJob) => - validateExecutionEntries(tJob.get().arguments) - .mapLeft((badEntries) => { - tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString()); - return new Error("invalid job arguments"); - }) - .flatMapAsync((args) => - getStdout(tJob.move(tJob.get().type), { env: args }), - ), - ) - .peek( - TraceUtil.promiseify((q) => - q.trace.trace( - q - .get() - .fold( - ({ isLeft }) => - jobTypeMetric(tJob.get().type)[isLeft ? "failure" : "success"], + tJob + .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type))) + .peek((tJob) => tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`)) + .map((tJob) => + validateExecutionEntries(tJob.get().arguments) + .mapLeft((badEntries) => { + tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString()); + return new Error('invalid job arguments'); + }) + .flatMapAsync((args) => getStdout(tJob.move(tJob.get().type), { env: args })), + ) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace( + q.get().fold(({ isLeft }) => jobTypeMetric(tJob.get().type)[isLeft ? 'failure' : 'success']), + ), ), - ), - ), - ) - .get(); + ) + .get(); // -- </job.exectuor> -- // -- <pipeline.executor> -- -const pipelinesMetric = Metric.fromName("pipelines"); +const pipelinesMetric = Metric.fromName('pipelines'); export const executePipeline = ( - tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>, - baseEnv?: JobArgT, + tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>, + baseEnv?: JobArgT, ): Promise<IEither<Error, void>> => - tPipeline - .bimap(TraceUtil.withFunctionTrace(executePipeline)) - .bimap(TraceUtil.withMetricTrace(pipelinesMetric)) - .map(async (tJobs): Promise<IEither<Error, void>> => { - for (const [i, serialStage] of tJobs.get().serialJobs.entries()) { - tJobs.trace.trace( - `executing stage ${i}. do your best little stage :>\n${serialStage}`, - ); - const jobResults = await Promise.all( - serialStage.parallelJobs.map((job) => - tJobs - .bimap((_) => [job, `stage ${i}`]) - .map( - (tJob) => - <Job>{ - ...tJob.get(), - arguments: { - ...baseEnv, - ...tJob.get().arguments, - }, - }, - ) - .map(executeJob) - .peek( - TraceUtil.promiseify((tEitherJobOutput) => - tEitherJobOutput - .get() - .mapRight((stdout) => - tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout), + tPipeline + .bimap(TraceUtil.withFunctionTrace(executePipeline)) + .bimap(TraceUtil.withMetricTrace(pipelinesMetric)) + .map(async (tJobs): Promise<IEither<Error, void>> => { + for (const [i, serialStage] of tJobs.get().serialJobs.entries()) { + tJobs.trace.trace(`executing stage ${i}. do your best little stage :>\n${serialStage}`); + const jobResults = await Promise.all( + serialStage.parallelJobs.map((job) => + tJobs + .bimap((_) => [job, `stage ${i}`]) + .map( + (tJob) => + <Job>{ + ...tJob.get(), + arguments: { + ...baseEnv, + ...tJob.get().arguments, + }, + }, + ) + .map(executeJob) + .peek( + TraceUtil.promiseify((tEitherJobOutput) => + tEitherJobOutput + .get() + .mapRight((stdout) => tEitherJobOutput.trace.addTrace('STDOUT').trace(stdout)), + ), + ) + .get(), ), - ), - ) - .get(), - ), - ); - const failures = jobResults.filter((e) => - e.fold(({ isLeft }) => isLeft), - ); - if (failures.length > 0) { - tJobs.trace.trace(pipelinesMetric.failure); - return Either.left(new Error(failures.toString())); - } - } - tJobs.trace.trace(pipelinesMetric.success); - return Either.right(undefined); - }) - .get(); + ); + const failures = jobResults.filter((e) => e.fold(({ isLeft }) => isLeft)); + if (failures.length > 0) { + tJobs.trace.trace(pipelinesMetric.failure); + return Either.left(new Error(failures.toString())); + } + } + tJobs.trace.trace(pipelinesMetric.success); + return Either.right(undefined); + }) + .get(); // -- </pipeline.executor> -- |