summaryrefslogtreecommitdiff
path: root/hooks/server/job/queuer.ts
diff options
context:
space:
mode:
Diffstat (limited to 'hooks/server/job/queuer.ts')
-rw-r--r--hooks/server/job/queuer.ts78
1 files changed, 78 insertions, 0 deletions
diff --git a/hooks/server/job/queuer.ts b/hooks/server/job/queuer.ts
new file mode 100644
index 0000000..6094183
--- /dev/null
+++ b/hooks/server/job/queuer.ts
@@ -0,0 +1,78 @@
+import {
+ getStdout,
+ type IEither,
+ type ITraceable,
+ LogLevel,
+ type Mapper,
+ memoize,
+ Metric,
+ type ServerTrace,
+ TraceUtil,
+} from "@emprespresso/pengueno";
+import type { Job } from "@emprespresso/ci-model";
+
+type QueuePosition = string;
+export class QueueError extends Error {}
+export interface IJobQueuer<TJob> {
+ queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>;
+}
+
+export class LaminarJobQueuer
+ implements IJobQueuer<ITraceable<Job, ServerTrace>> {
+ constructor(
+ private readonly queuePositionPrefix: string,
+ ) {}
+
+ private static GetJobTypeTrace = (jobType: string) =>
+ `LaminarJobQueue.Queue.${jobType}`;
+ private static JobTypeMetrics = memoize((jobType: string) =>
+ Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType))
+ );
+
+ public queue(j: ITraceable<Job, ServerTrace>) {
+ const { type: jobType } = j.get();
+ const trace = LaminarJobQueuer.GetJobTypeTrace(jobType);
+ const metric = LaminarJobQueuer.JobTypeMetrics(trace);
+
+ return j
+ .bimap(TraceUtil.withTrace(trace))
+ .bimap(TraceUtil.withMetricTrace(metric))
+ .map((j) => {
+ const { type: jobType, arguments: args } = j.get();
+ const laminarCommand = [
+ "laminarc",
+ "queue",
+ jobType,
+ ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`),
+ ];
+ return laminarCommand;
+ })
+ .peek((c) =>
+ c.trace.trace(
+ `im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`,
+ )
+ )
+ .map(getStdout)
+ .peek(
+ TraceUtil.promiseify((q) =>
+ q.trace.trace(
+ q.get().fold((err, _val) => err ? metric.failure : metric.success),
+ )
+ ),
+ )
+ .map(TraceUtil.promiseify((q) =>
+ q.get().mapRight((stdout) => {
+ q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${stdout}`);
+ const [jobName, jobId] = stdout.split(":");
+ const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
+
+ q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
+ return jobUrl;
+ }).mapLeft((err) => {
+ q.trace.addTrace(LogLevel.ERROR).trace(err.toString());
+ return err;
+ })
+ ))
+ .get();
+ }
+}