1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
import {
getStdout,
type ITraceable,
LogLevel,
type LogMetricTraceSupplier,
memoize,
Metric,
TraceUtil,
validateExecutionEntries,
Either,
type IEither,
} from "@emprespresso/pengueno";
import type { Job, JobArgT, Pipeline } from "@emprespresso/ci_model";
// -- <job.exectuor> --
const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`));
export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) =>
tJob
.bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
.peek((tJob) =>
tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`),
)
.map((tJob) =>
validateExecutionEntries(tJob.get().arguments)
.mapLeft((badEntries) => {
tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString());
return new Error("invalid job arguments");
})
.flatMapAsync((args) =>
getStdout(tJob.move(tJob.get().type), { env: args }),
),
)
.peek(
TraceUtil.promiseify((q) =>
q.trace.trace(
q
.get()
.fold(
({ isLeft }) =>
jobTypeMetric(tJob.get().type)[isLeft ? "failure" : "success"],
),
),
),
)
.get();
// -- </job.exectuor> --
// -- <pipeline.executor> --
const pipelinesMetric = Metric.fromName("pipelines");
export const executePipeline = (
tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
baseEnv?: JobArgT,
): Promise<IEither<Error, void>> =>
tPipeline
.bimap(TraceUtil.withFunctionTrace(executePipeline))
.bimap(TraceUtil.withMetricTrace(pipelinesMetric))
.map(async (tJobs): Promise<IEither<Error, void>> => {
for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
tJobs.trace.trace(
`executing stage ${i}. do your best little stage :>\n${serialStage}`,
);
const jobResults = await Promise.all(
serialStage.parallelJobs.map((job) =>
tJobs
.bimap((_) => [job, `stage ${i}`])
.map(
(tJob) =>
<Job>{
...tJob.get(),
arguments: {
...baseEnv,
...tJob.get().arguments,
},
},
)
.map(executeJob)
.peek(
TraceUtil.promiseify((tEitherJobOutput) =>
tEitherJobOutput
.get()
.mapRight((stdout) =>
tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout),
),
),
)
.get(),
),
);
const failures = jobResults.filter((e) =>
e.fold(({ isLeft }) => isLeft),
);
if (failures.length > 0) {
tJobs.trace.trace(pipelinesMetric.failure);
return Either.left(new Error(failures.toString()));
}
}
tJobs.trace.trace(pipelinesMetric.success);
return Either.right(undefined);
})
.get();
// -- </pipeline.executor> --
|