summaryrefslogtreecommitdiff
path: root/worker/executor.ts
diff options
context:
space:
mode:
Diffstat (limited to 'worker/executor.ts')
-rw-r--r--worker/executor.ts161
1 files changed, 73 insertions, 88 deletions
diff --git a/worker/executor.ts b/worker/executor.ts
index ea79995..f4b7906 100644
--- a/worker/executor.ts
+++ b/worker/executor.ts
@@ -1,101 +1,86 @@
import {
- getStdout,
- type ITraceable,
- LogLevel,
- type LogMetricTraceSupplier,
- memoize,
- Metric,
- TraceUtil,
- validateExecutionEntries,
- Either,
- type IEither,
-} from "@emprespresso/pengueno";
-import type { Job, JobArgT, Pipeline } from "@emprespresso/ci_model";
+ getStdout,
+ type ITraceable,
+ LogLevel,
+ type LogMetricTraceSupplier,
+ memoize,
+ Metric,
+ TraceUtil,
+ validateExecutionEntries,
+ Either,
+ type IEither,
+} from '@emprespresso/pengueno';
+import type { Job, JobArgT, Pipeline } from '@emprespresso/ci_model';
// -- <job.exectuor> --
const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`));
export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) =>
- tJob
- .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
- .peek((tJob) =>
- tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`),
- )
- .map((tJob) =>
- validateExecutionEntries(tJob.get().arguments)
- .mapLeft((badEntries) => {
- tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString());
- return new Error("invalid job arguments");
- })
- .flatMapAsync((args) =>
- getStdout(tJob.move(tJob.get().type), { env: args }),
- ),
- )
- .peek(
- TraceUtil.promiseify((q) =>
- q.trace.trace(
- q
- .get()
- .fold(
- ({ isLeft }) =>
- jobTypeMetric(tJob.get().type)[isLeft ? "failure" : "success"],
+ tJob
+ .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
+ .peek((tJob) => tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`))
+ .map((tJob) =>
+ validateExecutionEntries(tJob.get().arguments)
+ .mapLeft((badEntries) => {
+ tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString());
+ return new Error('invalid job arguments');
+ })
+ .flatMapAsync((args) => getStdout(tJob.move(tJob.get().type), { env: args })),
+ )
+ .peek(
+ TraceUtil.promiseify((q) =>
+ q.trace.trace(
+ q.get().fold(({ isLeft }) => jobTypeMetric(tJob.get().type)[isLeft ? 'failure' : 'success']),
+ ),
),
- ),
- ),
- )
- .get();
+ )
+ .get();
// -- </job.exectuor> --
// -- <pipeline.executor> --
-const pipelinesMetric = Metric.fromName("pipelines");
+const pipelinesMetric = Metric.fromName('pipelines');
export const executePipeline = (
- tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
- baseEnv?: JobArgT,
+ tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
+ baseEnv?: JobArgT,
): Promise<IEither<Error, void>> =>
- tPipeline
- .bimap(TraceUtil.withFunctionTrace(executePipeline))
- .bimap(TraceUtil.withMetricTrace(pipelinesMetric))
- .map(async (tJobs): Promise<IEither<Error, void>> => {
- for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
- tJobs.trace.trace(
- `executing stage ${i}. do your best little stage :>\n${serialStage}`,
- );
- const jobResults = await Promise.all(
- serialStage.parallelJobs.map((job) =>
- tJobs
- .bimap((_) => [job, `stage ${i}`])
- .map(
- (tJob) =>
- <Job>{
- ...tJob.get(),
- arguments: {
- ...baseEnv,
- ...tJob.get().arguments,
- },
- },
- )
- .map(executeJob)
- .peek(
- TraceUtil.promiseify((tEitherJobOutput) =>
- tEitherJobOutput
- .get()
- .mapRight((stdout) =>
- tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout),
+ tPipeline
+ .bimap(TraceUtil.withFunctionTrace(executePipeline))
+ .bimap(TraceUtil.withMetricTrace(pipelinesMetric))
+ .map(async (tJobs): Promise<IEither<Error, void>> => {
+ for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
+ tJobs.trace.trace(`executing stage ${i}. do your best little stage :>\n${serialStage}`);
+ const jobResults = await Promise.all(
+ serialStage.parallelJobs.map((job) =>
+ tJobs
+ .bimap((_) => [job, `stage ${i}`])
+ .map(
+ (tJob) =>
+ <Job>{
+ ...tJob.get(),
+ arguments: {
+ ...baseEnv,
+ ...tJob.get().arguments,
+ },
+ },
+ )
+ .map(executeJob)
+ .peek(
+ TraceUtil.promiseify((tEitherJobOutput) =>
+ tEitherJobOutput
+ .get()
+ .mapRight((stdout) => tEitherJobOutput.trace.addTrace('STDOUT').trace(stdout)),
+ ),
+ )
+ .get(),
),
- ),
- )
- .get(),
- ),
- );
- const failures = jobResults.filter((e) =>
- e.fold(({ isLeft }) => isLeft),
- );
- if (failures.length > 0) {
- tJobs.trace.trace(pipelinesMetric.failure);
- return Either.left(new Error(failures.toString()));
- }
- }
- tJobs.trace.trace(pipelinesMetric.success);
- return Either.right(undefined);
- })
- .get();
+ );
+ const failures = jobResults.filter((e) => e.fold(({ isLeft }) => isLeft));
+ if (failures.length > 0) {
+ tJobs.trace.trace(pipelinesMetric.failure);
+ return Either.left(new Error(failures.toString()));
+ }
+ }
+ tJobs.trace.trace(pipelinesMetric.success);
+ return Either.right(undefined);
+ })
+ .get();
// -- </pipeline.executor> --