summaryrefslogtreecommitdiff
path: root/worker/executor/pipeline.ts
diff options
context:
space:
mode:
Diffstat (limited to 'worker/executor/pipeline.ts')
-rw-r--r--worker/executor/pipeline.ts27
1 files changed, 16 insertions, 11 deletions
diff --git a/worker/executor/pipeline.ts b/worker/executor/pipeline.ts
index a1aa7c3..c8423b1 100644
--- a/worker/executor/pipeline.ts
+++ b/worker/executor/pipeline.ts
@@ -14,7 +14,8 @@ export const executePipeline = (
tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
baseEnv?: JobArgT,
): Promise<IEither<Error, void>> =>
- tPipeline.bimap(TraceUtil.withFunctionTrace(executePipeline))
+ tPipeline
+ .bimap(TraceUtil.withFunctionTrace(executePipeline))
.bimap(TraceUtil.withMetricTrace(pipelinesMetric))
.map(async (tJobs): Promise<IEither<Error, void>> => {
for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
@@ -23,22 +24,26 @@ export const executePipeline = (
);
const jobResults = await Promise.all(
serialStage.parallelJobs.map((job) =>
- tJobs.bimap((_) => [job, `stage ${i}`])
- .map((tJob) =>
- <Job> ({
- ...tJob.get(),
- arguments: { ...baseEnv, ...tJob.get().arguments },
- })
+ tJobs
+ .bimap((_) => [job, `stage ${i}`])
+ .map(
+ (tJob) =>
+ <Job>{
+ ...tJob.get(),
+ arguments: { ...baseEnv, ...tJob.get().arguments },
+ },
)
.map(executeJob)
.peek(
TraceUtil.promiseify((tEitherJobOutput) =>
- tEitherJobOutput.get().mapRight((stdout) =>
- tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout)
- )
+ tEitherJobOutput
+ .get()
+ .mapRight((stdout) =>
+ tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout),
+ ),
),
)
- .get()
+ .get(),
),
);
const failures = jobResults.filter((e) => e.fold((err) => !!err));