diff options
author | Elizabeth Hunt <me@liz.coffee> | 2025-06-20 14:53:38 -0700 |
---|---|---|
committer | Elizabeth Hunt <me@liz.coffee> | 2025-06-20 14:53:38 -0700 |
commit | d4791f3d357634daf506fb8f91cc5332a794c421 (patch) | |
tree | 1bb01d2d4d8fa74d83bb6f99f2c8aa4146ca2d11 /server/job.ts | |
parent | d7e8d31c94cd713a2f4cf799e20e993acc69e361 (diff) | |
download | ci-d4791f3d357634daf506fb8f91cc5332a794c421.tar.gz ci-d4791f3d357634daf506fb8f91cc5332a794c421.zip |
Move to nodejs
Diffstat (limited to 'server/job.ts')
-rw-r--r-- | server/job.ts | 185 |
1 files changed, 0 insertions, 185 deletions
diff --git a/server/job.ts b/server/job.ts deleted file mode 100644 index 620a083..0000000 --- a/server/job.ts +++ /dev/null @@ -1,185 +0,0 @@ -import { - getStdout, - type Mapper, - memoize, - Either, - ErrorSource, - type IActivity, - type IEither, - type ITraceable, - jsonModel, - JsonResponse, - LogLevel, - Metric, - PenguenoError, - type PenguenoRequest, - type ServerTrace, - TraceUtil, - validateExecutionEntries, -} from "@emprespresso/pengueno"; -import { isJob, type Job } from "@emprespresso/ci_model"; - -// -- <job.hook> -- -const wellFormedJobMetric = Metric.fromName("Job.WellFormed"); - -const jobJsonTransformer = ( - j: ITraceable<unknown, ServerTrace>, -): IEither<PenguenoError, Job> => - j - .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric)) - .map((tJson): IEither<PenguenoError, Job> => { - const tJob = tJson.get(); - if (!isJob(tJob) || !validateExecutionEntries(tJob)) { - const err = "seems like a pwetty mawfomed job (-.-)"; - tJson.trace.addTrace(LogLevel.WARN).trace(err); - return Either.left(new PenguenoError(err, 400)); - } - return Either.right(tJob); - }) - .peek((tJob) => - tJob.trace.trace( - tJob - .get() - .fold(({ isLeft }) => - isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success, - ), - ), - ) - .get(); - -export interface IJobHookActivity { - processHook: IActivity; -} - -const jobHookRequestMetric = Metric.fromName("JobHook.process"); -export class JobHookActivityImpl implements IJobHookActivity { - constructor( - private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>, - ) {} - - private trace(r: ITraceable<PenguenoRequest, ServerTrace>) { - return r - .bimap(TraceUtil.withClassTrace(this)) - .bimap(TraceUtil.withMetricTrace(jobHookRequestMetric)); - } - - public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) { - return this.trace(r) - .map(jsonModel(jobJsonTransformer)) - .map(async (tEitherJobJson) => { - const eitherJob = await tEitherJobJson.get(); - return eitherJob.flatMapAsync(async (job) => { - const eitherQueued = await tEitherJobJson - .move(job) - .map((job) => this.queuer.queue(job)) - .get(); - return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500)); - }); - }) - .peek( - TraceUtil.promiseify((tJob) => - tJob.get().fold(({ isRight, value }) => { - if (isRight) { - tJob.trace.trace(jobHookRequestMetric.success); - tJob.trace.trace(`all queued up and weady to go :D !! ${value}`); - return; - } - - tJob.trace.trace( - value.source === ErrorSource.SYSTEM - ? jobHookRequestMetric.failure - : jobHookRequestMetric.warn, - ); - tJob.trace.addTrace(value.source).trace(`${value}`); - }), - ), - ) - .map( - TraceUtil.promiseify( - (tEitherQueuedJob) => - new JsonResponse(r, tEitherQueuedJob.get(), { - status: tEitherQueuedJob - .get() - .fold(({ isRight, value }) => (isRight ? 200 : value.status)), - }), - ), - ) - .get(); - } -} - -// -- </job.hook> -- - -// -- <job.queuer> -- -type QueuePosition = string; -export class QueueError extends Error {} -export interface IJobQueuer<TJob> { - queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>; -} - -export class LaminarJobQueuer - implements IJobQueuer<ITraceable<Job, ServerTrace>> -{ - constructor(private readonly queuePositionPrefix: string) {} - - private static GetJobTypeTrace = (jobType: string) => - `LaminarJobQueue.Queue.${jobType}`; - private static JobTypeMetrics = memoize((jobType: string) => - Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)), - ); - - public queue(j: ITraceable<Job, ServerTrace>) { - const { type: jobType } = j.get(); - const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); - const metric = LaminarJobQueuer.JobTypeMetrics(jobType); - - return j - .bimap(TraceUtil.withTrace(trace)) - .bimap(TraceUtil.withMetricTrace(metric)) - .map((j) => { - const { type: jobType, arguments: args } = j.get(); - const laminarCommand = [ - "laminarc", - "queue", - jobType, - ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`), - ]; - return laminarCommand; - }) - .peek((c) => - c.trace.trace( - `im so excited to see how this queue job will end!! (>ᴗ<): ${c - .get() - .toString()}`, - ), - ) - .map(getStdout) - .peek( - TraceUtil.promiseify((q) => - q.trace.trace( - q - .get() - .fold(({ isLeft }) => (isLeft ? metric.failure : metric.success)), - ), - ), - ) - .map( - TraceUtil.promiseify((q) => - q.get().fold(({ isLeft, value }) => { - if (isLeft) { - q.trace.addTrace(LogLevel.ERROR).trace(value.toString()); - return Either.left<Error, string>(value); - } - q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`); - const [jobName, jobId] = value.split(":"); - const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; - - q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`); - return Either.right<Error, string>(jobUrl); - }), - ), - ) - .get(); - } -} -// -- </job.queuer> -- |