summaryrefslogtreecommitdiff
path: root/server/job/queue.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/job/queue.ts')
-rw-r--r--server/job/queue.ts74
1 files changed, 74 insertions, 0 deletions
diff --git a/server/job/queue.ts b/server/job/queue.ts
new file mode 100644
index 0000000..2392222
--- /dev/null
+++ b/server/job/queue.ts
@@ -0,0 +1,74 @@
+import {
+ getStdout,
+ type Mapper,
+ memoize,
+ Either,
+ type IEither,
+ type ITraceable,
+ LogLevel,
+ Metric,
+ type ServerTrace,
+ TraceUtil,
+} from '@emprespresso/pengueno';
+import { type Job } from '@emprespresso/ci_model';
+
+type QueuePosition = string;
+export class QueueError extends Error {}
+export interface IJobQueuer<TJob> {
+ queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>;
+}
+
+export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>> {
+ constructor(private readonly queuePositionPrefix: string) {}
+
+ private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`;
+ private static JobTypeMetrics = memoize((jobType: string) =>
+ Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
+ );
+
+ public queue(j: ITraceable<Job, ServerTrace>) {
+ const { type: jobType } = j.get();
+ const trace = LaminarJobQueuer.GetJobTypeTrace(jobType);
+ const metric = LaminarJobQueuer.JobTypeMetrics(jobType);
+
+ return j
+ .bimap(TraceUtil.withTrace(trace))
+ .bimap(TraceUtil.withMetricTrace(metric))
+ .map((j) => {
+ const { type: jobType, arguments: args } = j.get();
+ const laminarCommand = [
+ 'laminarc',
+ 'queue',
+ jobType,
+ ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`),
+ ];
+ return laminarCommand;
+ })
+ .peek((c) =>
+ c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`),
+ )
+ .map(getStdout)
+ .peek(
+ TraceUtil.promiseify((q) =>
+ q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))),
+ ),
+ )
+ .map(
+ TraceUtil.promiseify((q) =>
+ q.get().fold(({ isLeft, value }) => {
+ if (isLeft) {
+ q.trace.addTrace(LogLevel.ERROR).trace(value.toString());
+ return Either.left<Error, string>(value);
+ }
+ q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`);
+ const [jobName, jobId] = value.split(':');
+ const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
+
+ q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
+ return Either.right<Error, string>(jobUrl);
+ }),
+ ),
+ )
+ .get();
+ }
+}