summaryrefslogtreecommitdiff
path: root/server/job/run_activity.ts
blob: 22bc4c71158e6e6f71410ddc2cc3d9c1c8689bf9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import {
    Either,
    ErrorSource,
    type IActivity,
    type IEither,
    type ITraceable,
    jsonModel,
    JsonResponse,
    LogLevel,
    LogMetricTraceSupplier,
    Metric,
    MetricsTraceSupplier,
    PenguenoError,
    type PenguenoRequest,
    type ServerTrace,
    TraceUtil,
    validateExecutionEntries,
} from '@emprespresso/pengueno';
import { isJob, type Job } from '@emprespresso/ci_model';
import { IJobQueuer } from './queue.js';

const wellFormedJobMetric = Metric.fromName('Job.WellFormed').asResult();

const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, Job> =>
    j
        .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric))
        .map((tJson): IEither<PenguenoError, Job> => {
            const tJob = tJson.get();
            if (!isJob(tJob) || !validateExecutionEntries(tJob)) {
                const err = 'seems like a pwetty mawfomed job (-.-)';
                tJson.trace.traceScope(LogLevel.WARN).trace(err);
                return Either.left(new PenguenoError(err, 400));
            }
            return Either.right(tJob);
        })
        .peek(TraceUtil.traceResultingEither(wellFormedJobMetric))
        .get();

export interface IJobHookActivity {
    processHook: IActivity;
}

const jobHookRequestMetric = Metric.fromName('JobHook.process').asResult();
export class JobHookActivityImpl implements IJobHookActivity {
    constructor(private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>) {}

    private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
        return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(jobHookRequestMetric));
    }

    public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) {
        return this.trace(r)
            .map(jsonModel(jobJsonTransformer))
            .map(async (tEitherJobJson) => {
                const eitherJob = await tEitherJobJson.get();
                return eitherJob.flatMapAsync(async (job) => {
                    const eitherQueued = await tEitherJobJson
                        .move(job)
                        .map((job) => this.queuer.queue(job))
                        .get();
                    return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500));
                });
            })
            .flatMapAsync(
                TraceUtil.promiseify((tEitherQueued) => {
                    const errorSource = tEitherQueued
                        .get()
                        .left()
                        .map(({ source }) => source)
                        .orSome(() => ErrorSource.SYSTEM)
                        .get();
                    const shouldWarn = errorSource === ErrorSource.USER;
                    return TraceUtil.traceResultingEither<PenguenoError, string, LogMetricTraceSupplier>(
                        jobHookRequestMetric,
                        shouldWarn,
                    )(tEitherQueued);
                }),
            )
            .peek(
                TraceUtil.promiseify((tJob) =>
                    tJob.get().mapRight((job) => tJob.trace.trace(`all queued up and weady to go :D !! ${job}`)),
                ),
            )
            .map(
                TraceUtil.promiseify(
                    (tEitherQueuedJob) =>
                        new JsonResponse(r, tEitherQueuedJob.get(), {
                            status: tEitherQueuedJob.get().fold(
                                ({ status }) => status,
                                () => 200,
                            ),
                        }),
                ),
            )
            .get();
    }
}