summaryrefslogtreecommitdiff
path: root/worker/executor.ts
blob: f4b79063826ea5f57791919775272a424f802437 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import {
    getStdout,
    type ITraceable,
    LogLevel,
    type LogMetricTraceSupplier,
    memoize,
    Metric,
    TraceUtil,
    validateExecutionEntries,
    Either,
    type IEither,
} from '@emprespresso/pengueno';
import type { Job, JobArgT, Pipeline } from '@emprespresso/ci_model';

// -- <job.exectuor> --
const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`));
export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) =>
    tJob
        .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
        .peek((tJob) => tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`))
        .map((tJob) =>
            validateExecutionEntries(tJob.get().arguments)
                .mapLeft((badEntries) => {
                    tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString());
                    return new Error('invalid job arguments');
                })
                .flatMapAsync((args) => getStdout(tJob.move(tJob.get().type), { env: args })),
        )
        .peek(
            TraceUtil.promiseify((q) =>
                q.trace.trace(
                    q.get().fold(({ isLeft }) => jobTypeMetric(tJob.get().type)[isLeft ? 'failure' : 'success']),
                ),
            ),
        )
        .get();
// -- </job.exectuor> --

// -- <pipeline.executor> --
const pipelinesMetric = Metric.fromName('pipelines');
export const executePipeline = (
    tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
    baseEnv?: JobArgT,
): Promise<IEither<Error, void>> =>
    tPipeline
        .bimap(TraceUtil.withFunctionTrace(executePipeline))
        .bimap(TraceUtil.withMetricTrace(pipelinesMetric))
        .map(async (tJobs): Promise<IEither<Error, void>> => {
            for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
                tJobs.trace.trace(`executing stage ${i}. do your best little stage :>\n${serialStage}`);
                const jobResults = await Promise.all(
                    serialStage.parallelJobs.map((job) =>
                        tJobs
                            .bimap((_) => [job, `stage ${i}`])
                            .map(
                                (tJob) =>
                                    <Job>{
                                        ...tJob.get(),
                                        arguments: {
                                            ...baseEnv,
                                            ...tJob.get().arguments,
                                        },
                                    },
                            )
                            .map(executeJob)
                            .peek(
                                TraceUtil.promiseify((tEitherJobOutput) =>
                                    tEitherJobOutput
                                        .get()
                                        .mapRight((stdout) => tEitherJobOutput.trace.addTrace('STDOUT').trace(stdout)),
                                ),
                            )
                            .get(),
                    ),
                );
                const failures = jobResults.filter((e) => e.fold(({ isLeft }) => isLeft));
                if (failures.length > 0) {
                    tJobs.trace.trace(pipelinesMetric.failure);
                    return Either.left(new Error(failures.toString()));
                }
            }
            tJobs.trace.trace(pipelinesMetric.success);
            return Either.right(undefined);
        })
        .get();
// -- </pipeline.executor> --