diff options
author | Elizabeth Hunt <me@liz.coffee> | 2025-06-20 14:53:38 -0700 |
---|---|---|
committer | Elizabeth Hunt <me@liz.coffee> | 2025-06-20 14:53:38 -0700 |
commit | d4791f3d357634daf506fb8f91cc5332a794c421 (patch) | |
tree | 1bb01d2d4d8fa74d83bb6f99f2c8aa4146ca2d11 /server/job | |
parent | d7e8d31c94cd713a2f4cf799e20e993acc69e361 (diff) | |
download | ci-d4791f3d357634daf506fb8f91cc5332a794c421.tar.gz ci-d4791f3d357634daf506fb8f91cc5332a794c421.zip |
Move to nodejs
Diffstat (limited to 'server/job')
-rw-r--r-- | server/job/index.ts | 2 | ||||
-rw-r--r-- | server/job/queue.ts | 74 | ||||
-rw-r--r-- | server/job/run_activity.ts | 94 |
3 files changed, 170 insertions, 0 deletions
diff --git a/server/job/index.ts b/server/job/index.ts new file mode 100644 index 0000000..ecf0984 --- /dev/null +++ b/server/job/index.ts @@ -0,0 +1,2 @@ +export * from './queue'; +export * from './run_activity'; diff --git a/server/job/queue.ts b/server/job/queue.ts new file mode 100644 index 0000000..2392222 --- /dev/null +++ b/server/job/queue.ts @@ -0,0 +1,74 @@ +import { + getStdout, + type Mapper, + memoize, + Either, + type IEither, + type ITraceable, + LogLevel, + Metric, + type ServerTrace, + TraceUtil, +} from '@emprespresso/pengueno'; +import { type Job } from '@emprespresso/ci_model'; + +type QueuePosition = string; +export class QueueError extends Error {} +export interface IJobQueuer<TJob> { + queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>; +} + +export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>> { + constructor(private readonly queuePositionPrefix: string) {} + + private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`; + private static JobTypeMetrics = memoize((jobType: string) => + Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)), + ); + + public queue(j: ITraceable<Job, ServerTrace>) { + const { type: jobType } = j.get(); + const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); + const metric = LaminarJobQueuer.JobTypeMetrics(jobType); + + return j + .bimap(TraceUtil.withTrace(trace)) + .bimap(TraceUtil.withMetricTrace(metric)) + .map((j) => { + const { type: jobType, arguments: args } = j.get(); + const laminarCommand = [ + 'laminarc', + 'queue', + jobType, + ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`), + ]; + return laminarCommand; + }) + .peek((c) => + c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`), + ) + .map(getStdout) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))), + ), + ) + .map( + TraceUtil.promiseify((q) => + q.get().fold(({ isLeft, value }) => { + if (isLeft) { + q.trace.addTrace(LogLevel.ERROR).trace(value.toString()); + return Either.left<Error, string>(value); + } + q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`); + const [jobName, jobId] = value.split(':'); + const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; + + q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`); + return Either.right<Error, string>(jobUrl); + }), + ), + ) + .get(); + } +} diff --git a/server/job/run_activity.ts b/server/job/run_activity.ts new file mode 100644 index 0000000..9f25cf8 --- /dev/null +++ b/server/job/run_activity.ts @@ -0,0 +1,94 @@ +import { + Either, + ErrorSource, + type IActivity, + type IEither, + type ITraceable, + jsonModel, + JsonResponse, + LogLevel, + Metric, + PenguenoError, + type PenguenoRequest, + type ServerTrace, + TraceUtil, + validateExecutionEntries, +} from '@emprespresso/pengueno'; +import { isJob, type Job } from '@emprespresso/ci_model'; +import { IJobQueuer } from './queue'; + +const wellFormedJobMetric = Metric.fromName('Job.WellFormed'); + +const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, Job> => + j + .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric)) + .map((tJson): IEither<PenguenoError, Job> => { + const tJob = tJson.get(); + if (!isJob(tJob) || !validateExecutionEntries(tJob)) { + const err = 'seems like a pwetty mawfomed job (-.-)'; + tJson.trace.addTrace(LogLevel.WARN).trace(err); + return Either.left(new PenguenoError(err, 400)); + } + return Either.right(tJob); + }) + .peek((tJob) => + tJob.trace.trace( + tJob.get().fold(({ isLeft }) => (isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success)), + ), + ) + .get(); + +export interface IJobHookActivity { + processHook: IActivity; +} + +const jobHookRequestMetric = Metric.fromName('JobHook.process'); +export class JobHookActivityImpl implements IJobHookActivity { + constructor(private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>) {} + + private trace(r: ITraceable<PenguenoRequest, ServerTrace>) { + return r.bimap(TraceUtil.withClassTrace(this)).bimap(TraceUtil.withMetricTrace(jobHookRequestMetric)); + } + + public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) { + return this.trace(r) + .map(jsonModel(jobJsonTransformer)) + .map(async (tEitherJobJson) => { + const eitherJob = await tEitherJobJson.get(); + return eitherJob.flatMapAsync(async (job) => { + const eitherQueued = await tEitherJobJson + .move(job) + .map((job) => this.queuer.queue(job)) + .get(); + return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500)); + }); + }) + .peek( + TraceUtil.promiseify((tJob) => + tJob.get().fold(({ isRight, value }) => { + if (isRight) { + tJob.trace.trace(jobHookRequestMetric.success); + tJob.trace.trace(`all queued up and weady to go :D !! ${value}`); + return; + } + + tJob.trace.trace( + value.source === ErrorSource.SYSTEM + ? jobHookRequestMetric.failure + : jobHookRequestMetric.warn, + ); + tJob.trace.addTrace(value.source).trace(`${value}`); + }), + ), + ) + .map( + TraceUtil.promiseify( + (tEitherQueuedJob) => + new JsonResponse(r, tEitherQueuedJob.get(), { + status: tEitherQueuedJob.get().fold(({ isRight, value }) => (isRight ? 200 : value.status)), + }), + ), + ) + .get(); + } +} |