diff options
author | Elizabeth Alexander Hunt <me@liz.coffee> | 2025-05-12 09:40:12 -0700 |
---|---|---|
committer | Elizabeth <me@liz.coffee> | 2025-05-26 14:15:42 -0700 |
commit | d51c9d74857aca3c2f172609297266968bc7f809 (patch) | |
tree | 64327f9cc4219729aa11af32d7d4c70cddfc2292 /hooks/server/job/queuer.ts | |
parent | 30729a0cf707d9022bae0a7baaba77379dc31fd5 (diff) | |
download | ci-d51c9d74857aca3c2f172609297266968bc7f809.tar.gz ci-d51c9d74857aca3c2f172609297266968bc7f809.zip |
The big refactor TM
Diffstat (limited to 'hooks/server/job/queuer.ts')
-rw-r--r-- | hooks/server/job/queuer.ts | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/hooks/server/job/queuer.ts b/hooks/server/job/queuer.ts new file mode 100644 index 0000000..6094183 --- /dev/null +++ b/hooks/server/job/queuer.ts @@ -0,0 +1,78 @@ +import { + getStdout, + type IEither, + type ITraceable, + LogLevel, + type Mapper, + memoize, + Metric, + type ServerTrace, + TraceUtil, +} from "@emprespresso/pengueno"; +import type { Job } from "@emprespresso/ci-model"; + +type QueuePosition = string; +export class QueueError extends Error {} +export interface IJobQueuer<TJob> { + queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>; +} + +export class LaminarJobQueuer + implements IJobQueuer<ITraceable<Job, ServerTrace>> { + constructor( + private readonly queuePositionPrefix: string, + ) {} + + private static GetJobTypeTrace = (jobType: string) => + `LaminarJobQueue.Queue.${jobType}`; + private static JobTypeMetrics = memoize((jobType: string) => + Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)) + ); + + public queue(j: ITraceable<Job, ServerTrace>) { + const { type: jobType } = j.get(); + const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); + const metric = LaminarJobQueuer.JobTypeMetrics(trace); + + return j + .bimap(TraceUtil.withTrace(trace)) + .bimap(TraceUtil.withMetricTrace(metric)) + .map((j) => { + const { type: jobType, arguments: args } = j.get(); + const laminarCommand = [ + "laminarc", + "queue", + jobType, + ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`), + ]; + return laminarCommand; + }) + .peek((c) => + c.trace.trace( + `im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`, + ) + ) + .map(getStdout) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace( + q.get().fold((err, _val) => err ? metric.failure : metric.success), + ) + ), + ) + .map(TraceUtil.promiseify((q) => + q.get().mapRight((stdout) => { + q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${stdout}`); + const [jobName, jobId] = stdout.split(":"); + const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; + + q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`); + return jobUrl; + }).mapLeft((err) => { + q.trace.addTrace(LogLevel.ERROR).trace(err.toString()); + return err; + }) + )) + .get(); + } +} |