diff options
author | Elizabeth <me@liz.coffee> | 2025-06-02 16:52:52 -0700 |
---|---|---|
committer | Elizabeth <me@liz.coffee> | 2025-06-02 16:52:52 -0700 |
commit | 98f5c21aa65bbbca01a186a754249335b4afef57 (patch) | |
tree | 0fc8e01a73f0a3be4534c11724ad2ff634b4fd2f /server/job.ts | |
parent | 373d9ec700c0097a22cf665a8e33cf48998d1dc2 (diff) | |
download | ci-98f5c21aa65bbbca01a186a754249335b4afef57.tar.gz ci-98f5c21aa65bbbca01a186a754249335b4afef57.zip |
fixup the Either monad a bit for type safetyp
Diffstat (limited to 'server/job.ts')
-rw-r--r-- | server/job.ts | 325 |
1 files changed, 168 insertions, 157 deletions
diff --git a/server/job.ts b/server/job.ts index 62582d6..4e12b45 100644 --- a/server/job.ts +++ b/server/job.ts @@ -1,21 +1,21 @@ import { - getStdout, - type Mapper, - memoize, - Either, - ErrorSource, - type IActivity, - type IEither, - type ITraceable, - jsonModel, - JsonResponse, - LogLevel, - Metric, - PenguenoError, - type PenguenoRequest, - type ServerTrace, - TraceUtil, - validateExecutionEntries, + getStdout, + type Mapper, + memoize, + Either, + ErrorSource, + type IActivity, + type IEither, + type ITraceable, + jsonModel, + JsonResponse, + LogLevel, + Metric, + PenguenoError, + type PenguenoRequest, + type ServerTrace, + TraceUtil, + validateExecutionEntries, } from "@emprespresso/pengueno"; import { isJob, type Job } from "@emprespresso/ci_model"; @@ -23,93 +23,98 @@ import { isJob, type Job } from "@emprespresso/ci_model"; const wellFormedJobMetric = Metric.fromName("Job.WellFormed"); const jobJsonTransformer = ( - j: ITraceable<unknown, ServerTrace>, + j: ITraceable<unknown, ServerTrace>, ): IEither<PenguenoError, Job> => - j - .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric)) - .map((tJson) => { - if (!isJob(tJson) || !validateExecutionEntries(tJson)) { - const err = "seems like a pwetty mawfomed job \\(-.-)/"; - tJson.trace.addTrace(LogLevel.WARN).trace(err); - return Either.left<PenguenoError, Job>(new PenguenoError(err, 400)); - } - return Either.right<PenguenoError, Job>(tJson); - }) - .peek((tJob) => - tJob.trace.trace( - tJob - .get() - .fold((err) => - err ? wellFormedJobMetric.failure : wellFormedJobMetric.success, - ), - ), - ) - .get(); + j + .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric)) + .map((tJson) => { + if (!isJob(tJson) || !validateExecutionEntries(tJson)) { + const err = "seems like a pwetty mawfomed job \\(-.-)/"; + tJson.trace.addTrace(LogLevel.WARN).trace(err); + return Either.left<PenguenoError, Job>( + new PenguenoError(err, 400), + ); + } + return Either.right<PenguenoError, Job>(tJson); + }) + .peek((tJob) => + tJob.trace.trace( + tJob + .get() + .fold(({ isLeft }) => + isLeft + ? wellFormedJobMetric.failure + : wellFormedJobMetric.success, + ), + ), + ) + .get(); export interface IJobHookActivity { - processHook: IActivity; + processHook: IActivity; } const jobHookRequestMetric = Metric.fromName("JobHook.process"); export class JobHookActivityImpl implements IJobHookActivity { - constructor( - private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>, - ) {} + constructor( + private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>, + ) {} - private trace(r: ITraceable<PenguenoRequest, ServerTrace>) { - return r - .bimap(TraceUtil.withClassTrace(this)) - .bimap(TraceUtil.withMetricTrace(jobHookRequestMetric)); - } + private trace(r: ITraceable<PenguenoRequest, ServerTrace>) { + return r + .bimap(TraceUtil.withClassTrace(this)) + .bimap(TraceUtil.withMetricTrace(jobHookRequestMetric)); + } - public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) { - return this.trace(r) - .map(jsonModel(jobJsonTransformer)) - .map(async (tEitherJobJson) => { - const eitherJob = await tEitherJobJson.get(); - return eitherJob.flatMapAsync(async (job) => { - const eitherQueued = await tEitherJobJson - .move(job) - .map(this.queuer.queue) + public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) { + return this.trace(r) + .map(jsonModel(jobJsonTransformer)) + .map(async (tEitherJobJson) => { + const eitherJob = await tEitherJobJson.get(); + return eitherJob.flatMapAsync(async (job) => { + const eitherQueued = await tEitherJobJson + .move(job) + .map(this.queuer.queue) + .get(); + return eitherQueued.mapLeft( + (e) => new PenguenoError(e.message, 500), + ); + }); + }) + .peek( + TraceUtil.promiseify((tJob) => + tJob.get().fold(({ isRight, value }) => { + if (isRight) { + tJob.trace.trace(jobHookRequestMetric.success); + tJob.trace.trace( + `all queued up and weady to go :D !! ${value}`, + ); + return; + } + + tJob.trace.trace( + value.source === ErrorSource.SYSTEM + ? jobHookRequestMetric.failure + : jobHookRequestMetric.warn, + ); + tJob.trace.addTrace(value.source).trace(`${value}`); + }), + ), + ) + .map( + TraceUtil.promiseify( + (tEitherQueuedJob) => + new JsonResponse(r, tEitherQueuedJob.get(), { + status: tEitherQueuedJob + .get() + .fold(({ isRight, value }) => + isRight ? 200 : value.status, + ), + }), + ), + ) .get(); - return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500)); - }); - }) - .peek( - TraceUtil.promiseify((tJob) => - tJob - .get() - .fold( - (err: PenguenoError | undefined, _val: string | undefined) => { - if (!err) { - tJob.trace.trace(jobHookRequestMetric.success); - tJob.trace.trace( - `all queued up and weady to go :D !! ${_val}`, - ); - return; - } - tJob.trace.trace( - err.source === ErrorSource.SYSTEM - ? jobHookRequestMetric.failure - : jobHookRequestMetric.warn, - ); - tJob.trace.addTrace(err.source).trace(`${err}`); - }, - ), - ), - ) - .map( - TraceUtil.promiseify( - (tEitherQueuedJob) => - new JsonResponse(r, tEitherQueuedJob.get(), { - status: tEitherQueuedJob - .get() - .fold((err, _val) => (_val ? 200 : err?.status ?? 500)), - }), - ), - ) - .get(); - } + } } // -- </job.hook> -- @@ -118,76 +123,82 @@ export class JobHookActivityImpl implements IJobHookActivity { type QueuePosition = string; export class QueueError extends Error {} export interface IJobQueuer<TJob> { - queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>; + queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>; } export class LaminarJobQueuer - implements IJobQueuer<ITraceable<Job, ServerTrace>> + implements IJobQueuer<ITraceable<Job, ServerTrace>> { - constructor(private readonly queuePositionPrefix: string) {} - - private static GetJobTypeTrace = (jobType: string) => - `LaminarJobQueue.Queue.${jobType}`; - private static JobTypeMetrics = memoize((jobType: string) => - Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)), - ); + constructor(private readonly queuePositionPrefix: string) {} - public queue(j: ITraceable<Job, ServerTrace>) { - const { type: jobType } = j.get(); - const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); - const metric = LaminarJobQueuer.JobTypeMetrics(trace); + private static GetJobTypeTrace = (jobType: string) => + `LaminarJobQueue.Queue.${jobType}`; + private static JobTypeMetrics = memoize((jobType: string) => + Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)), + ); - return j - .bimap(TraceUtil.withTrace(trace)) - .bimap(TraceUtil.withMetricTrace(metric)) - .map((j) => { - const { type: jobType, arguments: args } = j.get(); - const laminarCommand = [ - "laminarc", - "queue", - jobType, - ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`), - ]; - return laminarCommand; - }) - .peek((c) => - c.trace.trace( - `im so excited to see how this queue job will end!! (>ᴗ<): ${c - .get() - .toString()}`, - ), - ) - .map(getStdout) - .peek( - TraceUtil.promiseify((q) => - q.trace.trace( - q - .get() - .fold((err, _val) => (err ? metric.failure : metric.success)), - ), - ), - ) - .map( - TraceUtil.promiseify((q) => - q - .get() - .mapRight((stdout) => { - q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${stdout}`); - const [jobName, jobId] = stdout.split(":"); - const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; + public queue(j: ITraceable<Job, ServerTrace>) { + const { type: jobType } = j.get(); + const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); + const metric = LaminarJobQueuer.JobTypeMetrics(trace); - q.trace.trace( - `all queued up and weady to go~ (˘ω˘) => ${jobUrl}`, - ); - return jobUrl; + return j + .bimap(TraceUtil.withTrace(trace)) + .bimap(TraceUtil.withMetricTrace(metric)) + .map((j) => { + const { type: jobType, arguments: args } = j.get(); + const laminarCommand = [ + "laminarc", + "queue", + jobType, + ...Object.entries(args).map( + ([key, val]) => `"${key}"="${val}"`, + ), + ]; + return laminarCommand; }) - .mapLeft((err) => { - q.trace.addTrace(LogLevel.ERROR).trace(err.toString()); - return err; - }), - ), - ) - .get(); - } + .peek((c) => + c.trace.trace( + `im so excited to see how this queue job will end!! (>ᴗ<): ${c + .get() + .toString()}`, + ), + ) + .map(getStdout) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace( + q + .get() + .fold(({ isLeft }) => + isLeft ? metric.failure : metric.success, + ), + ), + ), + ) + .map( + TraceUtil.promiseify((q) => + q.get().fold(({ isLeft, value }) => { + if (isLeft) { + q.trace + .addTrace(LogLevel.ERROR) + .trace(value.toString()); + return Either.left<Error, string>(value); + } + q.trace + .addTrace(LogLevel.DEBUG) + .trace(`stdout ${value}`); + const [jobName, jobId] = value.split(":"); + const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; + + q.trace.trace( + `all queued up and weady to go~ (˘ω˘) => ${jobUrl}`, + ); + return Either.right<Error, string>(jobUrl); + }), + ), + ) + .get(); + } } // -- </job.queuer> -- |