summaryrefslogtreecommitdiff
path: root/hooks/server/job/activity.ts
blob: 173cedf68da245b2196a60e03267f6f18fea4828 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import {
  Either,
  ErrorSource,
  type IActivity,
  type IEither,
  type ITraceable,
  jsonModel,
  JsonResponse,
  LogLevel,
  Metric,
  PenguenoError,
  type PenguenoRequest,
  type ServerTrace,
  TraceUtil,
  validateExecutionEntries,
} from "@emprespresso/pengueno";
import { isJob, type Job } from "@emprespresso/ci-model";
import type { IJobQueuer } from "@emprespresso/ci-hooks";

const wellFormedJobMetric = Metric.fromName("Job.WellFormed");

const jobJsonTransformer = (
  j: ITraceable<unknown, ServerTrace>,
): IEither<PenguenoError, Job> =>
  j
    .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric))
    .map((tJson) => {
      if (!isJob(tJson) || !validateExecutionEntries(tJson)) {
        const err = "seems like a pwetty mawfomed job \\(-.-)/";
        tJson.trace.addTrace(LogLevel.WARN).trace(err);
        return Either.left<PenguenoError, Job>(new PenguenoError(err, 400));
      }
      return Either.right<PenguenoError, Job>(tJson);
    })
    .peek((tJob) =>
      tJob.trace.trace(
        tJob
          .get()
          .fold((err) =>
            err ? wellFormedJobMetric.failure : wellFormedJobMetric.success,
          ),
      ),
    )
    .get();

export interface IJobHookActivity {
  processHook: IActivity;
}

const jobHookRequestMetric = Metric.fromName("JobHook.process");
export class JobHookActivityImpl implements IJobHookActivity {
  constructor(
    private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>,
  ) {}

  private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
    return r
      .bimap(TraceUtil.withClassTrace(this))
      .bimap(TraceUtil.withMetricTrace(jobHookRequestMetric));
  }

  public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) {
    return this.trace(r)
      .map(jsonModel(jobJsonTransformer))
      .map(async (tEitherJobJson) => {
        const eitherJob = await tEitherJobJson.get();
        return eitherJob.flatMapAsync(async (job) => {
          const eitherQueued = await tEitherJobJson
            .move(job)
            .map(this.queuer.queue)
            .get();
          return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500));
        });
      })
      .peek(
        TraceUtil.promiseify((tJob) =>
          tJob
            .get()
            .fold(
              (err: PenguenoError | undefined, _val: string | undefined) => {
                if (!err) {
                  tJob.trace.trace(jobHookRequestMetric.success);
                  tJob.trace.trace(
                    `all queued up and weady to go :D !! ${_val}`,
                  );
                  return;
                }
                tJob.trace.trace(
                  err.source === ErrorSource.SYSTEM
                    ? jobHookRequestMetric.failure
                    : jobHookRequestMetric.warn,
                );
                tJob.trace.addTrace(err.source).trace(`${err}`);
              },
            ),
        ),
      )
      .map(
        TraceUtil.promiseify(
          (tEitherQueuedJob) =>
            new JsonResponse(r, tEitherQueuedJob.get(), {
              status: tEitherQueuedJob
                .get()
                .fold(({ status }, _val) => (_val ? 200 : status)),
            }),
        ),
      )
      .get();
  }
}