summaryrefslogtreecommitdiff
path: root/worker/executor.ts
diff options
context:
space:
mode:
Diffstat (limited to 'worker/executor.ts')
-rw-r--r--worker/executor.ts76
1 files changed, 30 insertions, 46 deletions
diff --git a/worker/executor.ts b/worker/executor.ts
index f4b7906..bfcbc37 100644
--- a/worker/executor.ts
+++ b/worker/executor.ts
@@ -13,74 +13,58 @@ import {
import type { Job, JobArgT, Pipeline } from '@emprespresso/ci_model';
// -- <job.exectuor> --
-const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`));
-export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) =>
- tJob
- .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
+const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`).asResult());
+export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) => {
+ const metric = jobTypeMetric(tJob.get().type);
+ return tJob
+ .flatMap(TraceUtil.withMetricTrace(metric))
.peek((tJob) => tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`))
.map((tJob) =>
validateExecutionEntries(tJob.get().arguments)
.mapLeft((badEntries) => {
- tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString());
+ tJob.trace.traceScope(LogLevel.ERROR).trace(badEntries.toString());
return new Error('invalid job arguments');
})
.flatMapAsync((args) => getStdout(tJob.move(tJob.get().type), { env: args })),
)
- .peek(
- TraceUtil.promiseify((q) =>
- q.trace.trace(
- q.get().fold(({ isLeft }) => jobTypeMetric(tJob.get().type)[isLeft ? 'failure' : 'success']),
- ),
- ),
- )
+ .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric)))
.get();
+};
// -- </job.exectuor> --
// -- <pipeline.executor> --
-const pipelinesMetric = Metric.fromName('pipelines');
+const pipelinesMetric = Metric.fromName('pipelines').asResult();
export const executePipeline = (
tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
baseEnv?: JobArgT,
): Promise<IEither<Error, void>> =>
tPipeline
- .bimap(TraceUtil.withFunctionTrace(executePipeline))
- .bimap(TraceUtil.withMetricTrace(pipelinesMetric))
- .map(async (tJobs): Promise<IEither<Error, void>> => {
- for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
- tJobs.trace.trace(`executing stage ${i}. do your best little stage :>\n${serialStage}`);
- const jobResults = await Promise.all(
- serialStage.parallelJobs.map((job) =>
- tJobs
- .bimap((_) => [job, `stage ${i}`])
- .map(
- (tJob) =>
- <Job>{
- ...tJob.get(),
- arguments: {
- ...baseEnv,
- ...tJob.get().arguments,
- },
- },
- )
+ .flatMap(TraceUtil.withFunctionTrace(executePipeline))
+ .flatMap(TraceUtil.withMetricTrace(pipelinesMetric))
+ .map(async (_tPipeline): Promise<IEither<Error, void>> => {
+ for (const [i, serialStage] of tPipeline.get().serialJobs.entries()) {
+ const tPipeline = _tPipeline.flatMap(TraceUtil.withTrace(`Stage = ${i}`));
+ const parallelJobs = tPipeline
+ .peek((t) => t.trace.trace(`do your best little stage :> ${serialStage}`))
+ .move(serialStage.parallelJobs)
+ .coExtend((jobs) =>
+ jobs.get().map((job) => <Job>{ ...job, arguments: { ...baseEnv, ...job.arguments } }),
+ )
+ .map((job) => {
+ const metric = jobTypeMetric(job.get().type);
+ return job
+ .flatMap(TraceUtil.withMetricTrace(metric))
.map(executeJob)
- .peek(
- TraceUtil.promiseify((tEitherJobOutput) =>
- tEitherJobOutput
- .get()
- .mapRight((stdout) => tEitherJobOutput.trace.addTrace('STDOUT').trace(stdout)),
- ),
- )
- .get(),
- ),
- );
- const failures = jobResults.filter((e) => e.fold(({ isLeft }) => isLeft));
+ .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric)));
+ });
+ const results = await Promise.all(parallelJobs.map((job) => job.get()));
+ const failures = results.filter((e) => e.left);
if (failures.length > 0) {
- tJobs.trace.trace(pipelinesMetric.failure);
return Either.left(new Error(failures.toString()));
}
}
- tJobs.trace.trace(pipelinesMetric.success);
- return Either.right(undefined);
+ return Either.right(<void>undefined);
})
+ .flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(pipelinesMetric)))
.get();
// -- </pipeline.executor> --