import { getStdout, type Mapper, memoize, Either, ErrorSource, type IActivity, type IEither, type ITraceable, jsonModel, JsonResponse, LogLevel, Metric, PenguenoError, type PenguenoRequest, type ServerTrace, TraceUtil, validateExecutionEntries, } from "@emprespresso/pengueno"; import { isJob, type Job } from "@emprespresso/ci_model"; // -- -- const wellFormedJobMetric = Metric.fromName("Job.WellFormed"); const jobJsonTransformer = ( j: ITraceable, ): IEither => j .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric)) .map((tJson): IEither => { const tJob = tJson.get(); if (!isJob(tJob) || !validateExecutionEntries(tJob)) { const err = "seems like a pwetty mawfomed job (-.-)"; tJson.trace.addTrace(LogLevel.WARN).trace(err); return Either.left(new PenguenoError(err, 400)); } return Either.right(tJob); }) .peek((tJob) => tJob.trace.trace( tJob .get() .fold(({ isLeft }) => isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success, ), ), ) .get(); export interface IJobHookActivity { processHook: IActivity; } const jobHookRequestMetric = Metric.fromName("JobHook.process"); export class JobHookActivityImpl implements IJobHookActivity { constructor( private readonly queuer: IJobQueuer>, ) {} private trace(r: ITraceable) { return r .bimap(TraceUtil.withClassTrace(this)) .bimap(TraceUtil.withMetricTrace(jobHookRequestMetric)); } public processHook(r: ITraceable) { return this.trace(r) .map(jsonModel(jobJsonTransformer)) .map(async (tEitherJobJson) => { const eitherJob = await tEitherJobJson.get(); return eitherJob.flatMapAsync(async (job) => { const eitherQueued = await tEitherJobJson .move(job) .map((job) => this.queuer.queue(job)) .get(); return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500)); }); }) .peek( TraceUtil.promiseify((tJob) => tJob.get().fold(({ isRight, value }) => { if (isRight) { tJob.trace.trace(jobHookRequestMetric.success); tJob.trace.trace(`all queued up and weady to go :D !! ${value}`); return; } tJob.trace.trace( value.source === ErrorSource.SYSTEM ? jobHookRequestMetric.failure : jobHookRequestMetric.warn, ); tJob.trace.addTrace(value.source).trace(`${value}`); }), ), ) .map( TraceUtil.promiseify( (tEitherQueuedJob) => new JsonResponse(r, tEitherQueuedJob.get(), { status: tEitherQueuedJob .get() .fold(({ isRight, value }) => (isRight ? 200 : value.status)), }), ), ) .get(); } } // -- -- // -- -- type QueuePosition = string; export class QueueError extends Error {} export interface IJobQueuer { queue: Mapper>>; } export class LaminarJobQueuer implements IJobQueuer> { constructor(private readonly queuePositionPrefix: string) {} private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`; private static JobTypeMetrics = memoize((jobType: string) => Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)), ); public queue(j: ITraceable) { const { type: jobType } = j.get(); const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); const metric = LaminarJobQueuer.JobTypeMetrics(jobType); return j .bimap(TraceUtil.withTrace(trace)) .bimap(TraceUtil.withMetricTrace(metric)) .map((j) => { const { type: jobType, arguments: args } = j.get(); const laminarCommand = [ "laminarc", "queue", jobType, ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`), ]; return laminarCommand; }) .peek((c) => c.trace.trace( `im so excited to see how this queue job will end!! (>ᴗ<): ${c .get() .toString()}`, ), ) .map(getStdout) .peek( TraceUtil.promiseify((q) => q.trace.trace( q .get() .fold(({ isLeft }) => (isLeft ? metric.failure : metric.success)), ), ), ) .map( TraceUtil.promiseify((q) => q.get().fold(({ isLeft, value }) => { if (isLeft) { q.trace.addTrace(LogLevel.ERROR).trace(value.toString()); return Either.left(value); } q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`); const [jobName, jobId] = value.split(":"); const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`); return Either.right(jobUrl); }), ), ) .get(); } } // -- --