import { getStdout, type Mapper, memoize, type IEither, type ITraceable, LogLevel, Metric, type ServerTrace, TraceUtil, PenguenoError, } from '@emprespresso/pengueno'; import { type Job } from '@emprespresso/ci_model'; type QueuePosition = string; export class QueueError extends Error {} export interface IJobQueuer { queue: Mapper>>; } export class LaminarJobQueuer implements IJobQueuer> { constructor(private readonly queuePositionPrefix: string) {} private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`; private static JobTypeMetrics = memoize((jobType: string) => Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)).asResult(), ); public queue(j: ITraceable) { const { type: jobType } = j.get(); const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); const metric = LaminarJobQueuer.JobTypeMetrics(jobType); return j .flatMap(TraceUtil.withTrace(trace)) .flatMap(TraceUtil.withMetricTrace(metric)) .map((j) => { const { type: jobType, arguments: args } = j.get(); const laminarCommand = [ 'laminarc', 'queue', jobType, ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`), ]; return laminarCommand; }) .peek((c) => c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`), ) .map((c) => getStdout(c)) .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric))) .map( TraceUtil.promiseify((q) => q.get().mapBoth( (err) => { q.trace.traceScope(LogLevel.ERROR).trace(err); return err; }, (ok) => { q.trace.traceScope(LogLevel.DEBUG).trace(`stdout ${ok}`); const [jobName, jobId] = ok.split(':'); const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`.trim(); q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`); return jobUrl; }, ), ), ) .get(); } }