summaryrefslogtreecommitdiff
path: root/hooks/server/job/queuer.ts
blob: 069cca4f5ecfbc0882d48d5ef90ae71c463e4539 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import {
  getStdout,
  type IEither,
  type ITraceable,
  LogLevel,
  type Mapper,
  memoize,
  Metric,
  type ServerTrace,
  TraceUtil,
} from "@emprespresso/pengueno";
import type { Job } from "@emprespresso/ci-model";

type QueuePosition = string;
export class QueueError extends Error {}
export interface IJobQueuer<TJob> {
  queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>;
}

export class LaminarJobQueuer
  implements IJobQueuer<ITraceable<Job, ServerTrace>>
{
  constructor(private readonly queuePositionPrefix: string) {}

  private static GetJobTypeTrace = (jobType: string) =>
    `LaminarJobQueue.Queue.${jobType}`;
  private static JobTypeMetrics = memoize((jobType: string) =>
    Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
  );

  public queue(j: ITraceable<Job, ServerTrace>) {
    const { type: jobType } = j.get();
    const trace = LaminarJobQueuer.GetJobTypeTrace(jobType);
    const metric = LaminarJobQueuer.JobTypeMetrics(trace);

    return j
      .bimap(TraceUtil.withTrace(trace))
      .bimap(TraceUtil.withMetricTrace(metric))
      .map((j) => {
        const { type: jobType, arguments: args } = j.get();
        const laminarCommand = [
          "laminarc",
          "queue",
          jobType,
          ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`),
        ];
        return laminarCommand;
      })
      .peek((c) =>
        c.trace.trace(
          `im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`,
        ),
      )
      .map(getStdout)
      .peek(
        TraceUtil.promiseify((q) =>
          q.trace.trace(
            q
              .get()
              .fold((err, _val) => (err ? metric.failure : metric.success)),
          ),
        ),
      )
      .map(
        TraceUtil.promiseify((q) =>
          q
            .get()
            .mapRight((stdout) => {
              q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${stdout}`);
              const [jobName, jobId] = stdout.split(":");
              const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;

              q.trace.trace(
                `all queued up and weady to go~ (˘ω˘) => ${jobUrl}`,
              );
              return jobUrl;
            })
            .mapLeft((err) => {
              q.trace.addTrace(LogLevel.ERROR).trace(err.toString());
              return err;
            }),
        ),
      )
      .get();
  }
}