summaryrefslogtreecommitdiff
path: root/server/job/queue.ts
blob: 2392222f7a9e72c276d103bc4f4fb311954b6821 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import {
    getStdout,
    type Mapper,
    memoize,
    Either,
    type IEither,
    type ITraceable,
    LogLevel,
    Metric,
    type ServerTrace,
    TraceUtil,
} from '@emprespresso/pengueno';
import { type Job } from '@emprespresso/ci_model';

type QueuePosition = string;
export class QueueError extends Error {}
export interface IJobQueuer<TJob> {
    queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>;
}

export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>> {
    constructor(private readonly queuePositionPrefix: string) {}

    private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`;
    private static JobTypeMetrics = memoize((jobType: string) =>
        Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
    );

    public queue(j: ITraceable<Job, ServerTrace>) {
        const { type: jobType } = j.get();
        const trace = LaminarJobQueuer.GetJobTypeTrace(jobType);
        const metric = LaminarJobQueuer.JobTypeMetrics(jobType);

        return j
            .bimap(TraceUtil.withTrace(trace))
            .bimap(TraceUtil.withMetricTrace(metric))
            .map((j) => {
                const { type: jobType, arguments: args } = j.get();
                const laminarCommand = [
                    'laminarc',
                    'queue',
                    jobType,
                    ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`),
                ];
                return laminarCommand;
            })
            .peek((c) =>
                c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`),
            )
            .map(getStdout)
            .peek(
                TraceUtil.promiseify((q) =>
                    q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))),
                ),
            )
            .map(
                TraceUtil.promiseify((q) =>
                    q.get().fold(({ isLeft, value }) => {
                        if (isLeft) {
                            q.trace.addTrace(LogLevel.ERROR).trace(value.toString());
                            return Either.left<Error, string>(value);
                        }
                        q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`);
                        const [jobName, jobId] = value.split(':');
                        const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;

                        q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
                        return Either.right<Error, string>(jobUrl);
                    }),
                ),
            )
            .get();
    }
}