summaryrefslogtreecommitdiff
path: root/server/job/run_activity.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/job/run_activity.ts')
-rw-r--r--server/job/run_activity.ts94
1 files changed, 94 insertions, 0 deletions
diff --git a/server/job/run_activity.ts b/server/job/run_activity.ts
new file mode 100644
index 0000000..9f25cf8
--- /dev/null
+++ b/server/job/run_activity.ts
@@ -0,0 +1,94 @@
+import {
+ Either,
+ ErrorSource,
+ type IActivity,
+ type IEither,
+ type ITraceable,
+ jsonModel,
+ JsonResponse,
+ LogLevel,
+ Metric,
+ PenguenoError,
+ type PenguenoRequest,
+ type ServerTrace,
+ TraceUtil,
+ validateExecutionEntries,
+} from '@emprespresso/pengueno';
+import { isJob, type Job } from '@emprespresso/ci_model';
+import { IJobQueuer } from './queue';
+
+const wellFormedJobMetric = Metric.fromName('Job.WellFormed');
+
+const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, Job> =>
+ j
+ .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric))
+ .map((tJson): IEither<PenguenoError, Job> => {
+ const tJob = tJson.get();
+ if (!isJob(tJob) || !validateExecutionEntries(tJob)) {
+ const err = 'seems like a pwetty mawfomed job (-.-)';
+ tJson.trace.addTrace(LogLevel.WARN).trace(err);
+ return Either.left(new PenguenoError(err, 400));
+ }
+ return Either.right(tJob);
+ })
+ .peek((tJob) =>
+ tJob.trace.trace(
+ tJob.get().fold(({ isLeft }) => (isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success)),
+ ),
+ )
+ .get();
+
+export interface IJobHookActivity {
+ processHook: IActivity;
+}
+
+const jobHookRequestMetric = Metric.fromName('JobHook.process');
+export class JobHookActivityImpl implements IJobHookActivity {
+ constructor(private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>) {}
+
+ private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
+ return r.bimap(TraceUtil.withClassTrace(this)).bimap(TraceUtil.withMetricTrace(jobHookRequestMetric));
+ }
+
+ public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) {
+ return this.trace(r)
+ .map(jsonModel(jobJsonTransformer))
+ .map(async (tEitherJobJson) => {
+ const eitherJob = await tEitherJobJson.get();
+ return eitherJob.flatMapAsync(async (job) => {
+ const eitherQueued = await tEitherJobJson
+ .move(job)
+ .map((job) => this.queuer.queue(job))
+ .get();
+ return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500));
+ });
+ })
+ .peek(
+ TraceUtil.promiseify((tJob) =>
+ tJob.get().fold(({ isRight, value }) => {
+ if (isRight) {
+ tJob.trace.trace(jobHookRequestMetric.success);
+ tJob.trace.trace(`all queued up and weady to go :D !! ${value}`);
+ return;
+ }
+
+ tJob.trace.trace(
+ value.source === ErrorSource.SYSTEM
+ ? jobHookRequestMetric.failure
+ : jobHookRequestMetric.warn,
+ );
+ tJob.trace.addTrace(value.source).trace(`${value}`);
+ }),
+ ),
+ )
+ .map(
+ TraceUtil.promiseify(
+ (tEitherQueuedJob) =>
+ new JsonResponse(r, tEitherQueuedJob.get(), {
+ status: tEitherQueuedJob.get().fold(({ isRight, value }) => (isRight ? 200 : value.status)),
+ }),
+ ),
+ )
+ .get();
+ }
+}