From d4791f3d357634daf506fb8f91cc5332a794c421 Mon Sep 17 00:00:00 2001 From: Elizabeth Hunt Date: Fri, 20 Jun 2025 14:53:38 -0700 Subject: Move to nodejs --- server/job/queue.ts | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 server/job/queue.ts (limited to 'server/job/queue.ts') diff --git a/server/job/queue.ts b/server/job/queue.ts new file mode 100644 index 0000000..2392222 --- /dev/null +++ b/server/job/queue.ts @@ -0,0 +1,74 @@ +import { + getStdout, + type Mapper, + memoize, + Either, + type IEither, + type ITraceable, + LogLevel, + Metric, + type ServerTrace, + TraceUtil, +} from '@emprespresso/pengueno'; +import { type Job } from '@emprespresso/ci_model'; + +type QueuePosition = string; +export class QueueError extends Error {} +export interface IJobQueuer { + queue: Mapper>>; +} + +export class LaminarJobQueuer implements IJobQueuer> { + constructor(private readonly queuePositionPrefix: string) {} + + private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`; + private static JobTypeMetrics = memoize((jobType: string) => + Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)), + ); + + public queue(j: ITraceable) { + const { type: jobType } = j.get(); + const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); + const metric = LaminarJobQueuer.JobTypeMetrics(jobType); + + return j + .bimap(TraceUtil.withTrace(trace)) + .bimap(TraceUtil.withMetricTrace(metric)) + .map((j) => { + const { type: jobType, arguments: args } = j.get(); + const laminarCommand = [ + 'laminarc', + 'queue', + jobType, + ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`), + ]; + return laminarCommand; + }) + .peek((c) => + c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`), + ) + .map(getStdout) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))), + ), + ) + .map( + TraceUtil.promiseify((q) => + q.get().fold(({ isLeft, value }) => { + if (isLeft) { + q.trace.addTrace(LogLevel.ERROR).trace(value.toString()); + return Either.left(value); + } + q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`); + const [jobName, jobId] = value.split(':'); + const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; + + q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`); + return Either.right(jobUrl); + }), + ), + ) + .get(); + } +} -- cgit v1.2.3-70-g09d2