summaryrefslogtreecommitdiff
path: root/server/job.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/job.ts')
-rw-r--r--server/job.ts185
1 files changed, 0 insertions, 185 deletions
diff --git a/server/job.ts b/server/job.ts
deleted file mode 100644
index 620a083..0000000
--- a/server/job.ts
+++ /dev/null
@@ -1,185 +0,0 @@
-import {
- getStdout,
- type Mapper,
- memoize,
- Either,
- ErrorSource,
- type IActivity,
- type IEither,
- type ITraceable,
- jsonModel,
- JsonResponse,
- LogLevel,
- Metric,
- PenguenoError,
- type PenguenoRequest,
- type ServerTrace,
- TraceUtil,
- validateExecutionEntries,
-} from "@emprespresso/pengueno";
-import { isJob, type Job } from "@emprespresso/ci_model";
-
-// -- <job.hook> --
-const wellFormedJobMetric = Metric.fromName("Job.WellFormed");
-
-const jobJsonTransformer = (
- j: ITraceable<unknown, ServerTrace>,
-): IEither<PenguenoError, Job> =>
- j
- .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric))
- .map((tJson): IEither<PenguenoError, Job> => {
- const tJob = tJson.get();
- if (!isJob(tJob) || !validateExecutionEntries(tJob)) {
- const err = "seems like a pwetty mawfomed job (-.-)";
- tJson.trace.addTrace(LogLevel.WARN).trace(err);
- return Either.left(new PenguenoError(err, 400));
- }
- return Either.right(tJob);
- })
- .peek((tJob) =>
- tJob.trace.trace(
- tJob
- .get()
- .fold(({ isLeft }) =>
- isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success,
- ),
- ),
- )
- .get();
-
-export interface IJobHookActivity {
- processHook: IActivity;
-}
-
-const jobHookRequestMetric = Metric.fromName("JobHook.process");
-export class JobHookActivityImpl implements IJobHookActivity {
- constructor(
- private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>,
- ) {}
-
- private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
- return r
- .bimap(TraceUtil.withClassTrace(this))
- .bimap(TraceUtil.withMetricTrace(jobHookRequestMetric));
- }
-
- public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) {
- return this.trace(r)
- .map(jsonModel(jobJsonTransformer))
- .map(async (tEitherJobJson) => {
- const eitherJob = await tEitherJobJson.get();
- return eitherJob.flatMapAsync(async (job) => {
- const eitherQueued = await tEitherJobJson
- .move(job)
- .map((job) => this.queuer.queue(job))
- .get();
- return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500));
- });
- })
- .peek(
- TraceUtil.promiseify((tJob) =>
- tJob.get().fold(({ isRight, value }) => {
- if (isRight) {
- tJob.trace.trace(jobHookRequestMetric.success);
- tJob.trace.trace(`all queued up and weady to go :D !! ${value}`);
- return;
- }
-
- tJob.trace.trace(
- value.source === ErrorSource.SYSTEM
- ? jobHookRequestMetric.failure
- : jobHookRequestMetric.warn,
- );
- tJob.trace.addTrace(value.source).trace(`${value}`);
- }),
- ),
- )
- .map(
- TraceUtil.promiseify(
- (tEitherQueuedJob) =>
- new JsonResponse(r, tEitherQueuedJob.get(), {
- status: tEitherQueuedJob
- .get()
- .fold(({ isRight, value }) => (isRight ? 200 : value.status)),
- }),
- ),
- )
- .get();
- }
-}
-
-// -- </job.hook> --
-
-// -- <job.queuer> --
-type QueuePosition = string;
-export class QueueError extends Error {}
-export interface IJobQueuer<TJob> {
- queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>;
-}
-
-export class LaminarJobQueuer
- implements IJobQueuer<ITraceable<Job, ServerTrace>>
-{
- constructor(private readonly queuePositionPrefix: string) {}
-
- private static GetJobTypeTrace = (jobType: string) =>
- `LaminarJobQueue.Queue.${jobType}`;
- private static JobTypeMetrics = memoize((jobType: string) =>
- Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
- );
-
- public queue(j: ITraceable<Job, ServerTrace>) {
- const { type: jobType } = j.get();
- const trace = LaminarJobQueuer.GetJobTypeTrace(jobType);
- const metric = LaminarJobQueuer.JobTypeMetrics(jobType);
-
- return j
- .bimap(TraceUtil.withTrace(trace))
- .bimap(TraceUtil.withMetricTrace(metric))
- .map((j) => {
- const { type: jobType, arguments: args } = j.get();
- const laminarCommand = [
- "laminarc",
- "queue",
- jobType,
- ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`),
- ];
- return laminarCommand;
- })
- .peek((c) =>
- c.trace.trace(
- `im so excited to see how this queue job will end!! (>ᴗ<): ${c
- .get()
- .toString()}`,
- ),
- )
- .map(getStdout)
- .peek(
- TraceUtil.promiseify((q) =>
- q.trace.trace(
- q
- .get()
- .fold(({ isLeft }) => (isLeft ? metric.failure : metric.success)),
- ),
- ),
- )
- .map(
- TraceUtil.promiseify((q) =>
- q.get().fold(({ isLeft, value }) => {
- if (isLeft) {
- q.trace.addTrace(LogLevel.ERROR).trace(value.toString());
- return Either.left<Error, string>(value);
- }
- q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`);
- const [jobName, jobId] = value.split(":");
- const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
-
- q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
- return Either.right<Error, string>(jobUrl);
- }),
- ),
- )
- .get();
- }
-}
-// -- </job.queuer> --