summaryrefslogtreecommitdiff
path: root/server/job
diff options
context:
space:
mode:
authorElizabeth Hunt <me@liz.coffee>2025-06-20 14:53:38 -0700
committerElizabeth Hunt <me@liz.coffee>2025-06-20 14:53:38 -0700
commitd4791f3d357634daf506fb8f91cc5332a794c421 (patch)
tree1bb01d2d4d8fa74d83bb6f99f2c8aa4146ca2d11 /server/job
parentd7e8d31c94cd713a2f4cf799e20e993acc69e361 (diff)
downloadci-d4791f3d357634daf506fb8f91cc5332a794c421.tar.gz
ci-d4791f3d357634daf506fb8f91cc5332a794c421.zip
Move to nodejs
Diffstat (limited to 'server/job')
-rw-r--r--server/job/index.ts2
-rw-r--r--server/job/queue.ts74
-rw-r--r--server/job/run_activity.ts94
3 files changed, 170 insertions, 0 deletions
diff --git a/server/job/index.ts b/server/job/index.ts
new file mode 100644
index 0000000..ecf0984
--- /dev/null
+++ b/server/job/index.ts
@@ -0,0 +1,2 @@
+export * from './queue';
+export * from './run_activity';
diff --git a/server/job/queue.ts b/server/job/queue.ts
new file mode 100644
index 0000000..2392222
--- /dev/null
+++ b/server/job/queue.ts
@@ -0,0 +1,74 @@
+import {
+ getStdout,
+ type Mapper,
+ memoize,
+ Either,
+ type IEither,
+ type ITraceable,
+ LogLevel,
+ Metric,
+ type ServerTrace,
+ TraceUtil,
+} from '@emprespresso/pengueno';
+import { type Job } from '@emprespresso/ci_model';
+
+type QueuePosition = string;
+export class QueueError extends Error {}
+export interface IJobQueuer<TJob> {
+ queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>;
+}
+
+export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>> {
+ constructor(private readonly queuePositionPrefix: string) {}
+
+ private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`;
+ private static JobTypeMetrics = memoize((jobType: string) =>
+ Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
+ );
+
+ public queue(j: ITraceable<Job, ServerTrace>) {
+ const { type: jobType } = j.get();
+ const trace = LaminarJobQueuer.GetJobTypeTrace(jobType);
+ const metric = LaminarJobQueuer.JobTypeMetrics(jobType);
+
+ return j
+ .bimap(TraceUtil.withTrace(trace))
+ .bimap(TraceUtil.withMetricTrace(metric))
+ .map((j) => {
+ const { type: jobType, arguments: args } = j.get();
+ const laminarCommand = [
+ 'laminarc',
+ 'queue',
+ jobType,
+ ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`),
+ ];
+ return laminarCommand;
+ })
+ .peek((c) =>
+ c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`),
+ )
+ .map(getStdout)
+ .peek(
+ TraceUtil.promiseify((q) =>
+ q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))),
+ ),
+ )
+ .map(
+ TraceUtil.promiseify((q) =>
+ q.get().fold(({ isLeft, value }) => {
+ if (isLeft) {
+ q.trace.addTrace(LogLevel.ERROR).trace(value.toString());
+ return Either.left<Error, string>(value);
+ }
+ q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`);
+ const [jobName, jobId] = value.split(':');
+ const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
+
+ q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
+ return Either.right<Error, string>(jobUrl);
+ }),
+ ),
+ )
+ .get();
+ }
+}
diff --git a/server/job/run_activity.ts b/server/job/run_activity.ts
new file mode 100644
index 0000000..9f25cf8
--- /dev/null
+++ b/server/job/run_activity.ts
@@ -0,0 +1,94 @@
+import {
+ Either,
+ ErrorSource,
+ type IActivity,
+ type IEither,
+ type ITraceable,
+ jsonModel,
+ JsonResponse,
+ LogLevel,
+ Metric,
+ PenguenoError,
+ type PenguenoRequest,
+ type ServerTrace,
+ TraceUtil,
+ validateExecutionEntries,
+} from '@emprespresso/pengueno';
+import { isJob, type Job } from '@emprespresso/ci_model';
+import { IJobQueuer } from './queue';
+
+const wellFormedJobMetric = Metric.fromName('Job.WellFormed');
+
+const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, Job> =>
+ j
+ .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric))
+ .map((tJson): IEither<PenguenoError, Job> => {
+ const tJob = tJson.get();
+ if (!isJob(tJob) || !validateExecutionEntries(tJob)) {
+ const err = 'seems like a pwetty mawfomed job (-.-)';
+ tJson.trace.addTrace(LogLevel.WARN).trace(err);
+ return Either.left(new PenguenoError(err, 400));
+ }
+ return Either.right(tJob);
+ })
+ .peek((tJob) =>
+ tJob.trace.trace(
+ tJob.get().fold(({ isLeft }) => (isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success)),
+ ),
+ )
+ .get();
+
+export interface IJobHookActivity {
+ processHook: IActivity;
+}
+
+const jobHookRequestMetric = Metric.fromName('JobHook.process');
+export class JobHookActivityImpl implements IJobHookActivity {
+ constructor(private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>) {}
+
+ private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
+ return r.bimap(TraceUtil.withClassTrace(this)).bimap(TraceUtil.withMetricTrace(jobHookRequestMetric));
+ }
+
+ public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) {
+ return this.trace(r)
+ .map(jsonModel(jobJsonTransformer))
+ .map(async (tEitherJobJson) => {
+ const eitherJob = await tEitherJobJson.get();
+ return eitherJob.flatMapAsync(async (job) => {
+ const eitherQueued = await tEitherJobJson
+ .move(job)
+ .map((job) => this.queuer.queue(job))
+ .get();
+ return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500));
+ });
+ })
+ .peek(
+ TraceUtil.promiseify((tJob) =>
+ tJob.get().fold(({ isRight, value }) => {
+ if (isRight) {
+ tJob.trace.trace(jobHookRequestMetric.success);
+ tJob.trace.trace(`all queued up and weady to go :D !! ${value}`);
+ return;
+ }
+
+ tJob.trace.trace(
+ value.source === ErrorSource.SYSTEM
+ ? jobHookRequestMetric.failure
+ : jobHookRequestMetric.warn,
+ );
+ tJob.trace.addTrace(value.source).trace(`${value}`);
+ }),
+ ),
+ )
+ .map(
+ TraceUtil.promiseify(
+ (tEitherQueuedJob) =>
+ new JsonResponse(r, tEitherQueuedJob.get(), {
+ status: tEitherQueuedJob.get().fold(({ isRight, value }) => (isRight ? 200 : value.status)),
+ }),
+ ),
+ )
+ .get();
+ }
+}