1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
import {
getStdout,
type ITraceable,
LogLevel,
type LogMetricTraceSupplier,
memoize,
Metric,
TraceUtil,
validateExecutionEntries,
Either,
type IEither,
} from '@emprespresso/pengueno';
import type { Job, JobArgT, Pipeline } from '@emprespresso/ci_model';
// -- <job.exectuor> --
const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`).asResult());
export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) => {
const metric = jobTypeMetric(tJob.get().type);
return tJob
.flatMap(TraceUtil.withMetricTrace(metric))
.peek((tJob) => tJob.trace.trace(`let's do this little job ok!! ${JSON.stringify(tJob.get())}`))
.map((tJob) =>
validateExecutionEntries(tJob.get().arguments)
.mapLeft((badEntries) => {
tJob.trace.traceScope(LogLevel.ERROR).trace(JSON.stringify(badEntries));
return new Error('invalid job arguments');
})
.flatMapAsync((args) => getStdout(tJob.move(tJob.get().type), { env: args })),
)
.flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric)))
.peek(TraceUtil.promiseify((t) => t.traceScope(() => LogLevel.DEBUG).trace.trace(JSON.stringify(t.get()))))
.get();
};
// -- </job.exectuor> --
// -- <pipeline.executor> --
const pipelinesMetric = Metric.fromName('pipelines').asResult();
export const executePipeline = (
tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
baseEnv?: JobArgT,
): Promise<IEither<Error, void>> =>
tPipeline
.flatMap(TraceUtil.withFunctionTrace(executePipeline))
.flatMap(TraceUtil.withMetricTrace(pipelinesMetric))
.map(async (_tPipeline): Promise<IEither<Error, void>> => {
for (const [i, serialStage] of tPipeline.get().serialJobs.entries()) {
const tPipeline = _tPipeline.traceScope(() => `Stage = ${i}`);
const parallelJobs = tPipeline
.peek((t) => t.trace.trace(`do your best little stage :> ${JSON.stringify(serialStage)}`))
.move(serialStage.parallelJobs)
.coExtend((jobs) =>
jobs.get().map((job) => <Job>{ ...job, arguments: { ...baseEnv, ...job.arguments } }),
)
.map((job) => {
const metric = jobTypeMetric(job.get().type);
return job
.flatMap(TraceUtil.withMetricTrace(metric))
.map(executeJob)
.flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric)));
});
const results = await Promise.all(parallelJobs.map((job) => job.get()));
const failures = results.filter((e) => e.left().present());
if (failures.length > 0) {
return Either.left(new Error(JSON.stringify(failures)));
}
}
return Either.right(<void>undefined);
})
.flatMapAsync(TraceUtil.promiseify(TraceUtil.traceResultingEither(pipelinesMetric)))
.get();
// -- </pipeline.executor> --
|