import {
getStdout,
type Mapper,
memoize,
Either,
ErrorSource,
type IActivity,
type IEither,
type ITraceable,
jsonModel,
JsonResponse,
LogLevel,
Metric,
PenguenoError,
type PenguenoRequest,
type ServerTrace,
TraceUtil,
validateExecutionEntries,
} from "@emprespresso/pengueno";
import { isJob, type Job } from "@emprespresso/ci_model";
// -- --
const wellFormedJobMetric = Metric.fromName("Job.WellFormed");
const jobJsonTransformer = (
j: ITraceable,
): IEither =>
j
.bimap(TraceUtil.withMetricTrace(wellFormedJobMetric))
.map((tJson) => {
if (!isJob(tJson) || !validateExecutionEntries(tJson)) {
const err = "seems like a pwetty mawfomed job \\(-.-)/";
tJson.trace.addTrace(LogLevel.WARN).trace(err);
return Either.left(new PenguenoError(err, 400));
}
return Either.right(tJson);
})
.peek((tJob) =>
tJob.trace.trace(
tJob
.get()
.fold((err) =>
err ? wellFormedJobMetric.failure : wellFormedJobMetric.success,
),
),
)
.get();
export interface IJobHookActivity {
processHook: IActivity;
}
const jobHookRequestMetric = Metric.fromName("JobHook.process");
export class JobHookActivityImpl implements IJobHookActivity {
constructor(
private readonly queuer: IJobQueuer>,
) {}
private trace(r: ITraceable) {
return r
.bimap(TraceUtil.withClassTrace(this))
.bimap(TraceUtil.withMetricTrace(jobHookRequestMetric));
}
public processHook(r: ITraceable) {
return this.trace(r)
.map(jsonModel(jobJsonTransformer))
.map(async (tEitherJobJson) => {
const eitherJob = await tEitherJobJson.get();
return eitherJob.flatMapAsync(async (job) => {
const eitherQueued = await tEitherJobJson
.move(job)
.map(this.queuer.queue)
.get();
return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500));
});
})
.peek(
TraceUtil.promiseify((tJob) =>
tJob
.get()
.fold(
(err: PenguenoError | undefined, _val: string | undefined) => {
if (!err) {
tJob.trace.trace(jobHookRequestMetric.success);
tJob.trace.trace(
`all queued up and weady to go :D !! ${_val}`,
);
return;
}
tJob.trace.trace(
err.source === ErrorSource.SYSTEM
? jobHookRequestMetric.failure
: jobHookRequestMetric.warn,
);
tJob.trace.addTrace(err.source).trace(`${err}`);
},
),
),
)
.map(
TraceUtil.promiseify(
(tEitherQueuedJob) =>
new JsonResponse(r, tEitherQueuedJob.get(), {
status: tEitherQueuedJob
.get()
.fold((err, _val) => (_val ? 200 : err?.status ?? 500)),
}),
),
)
.get();
}
}
// -- --
// -- --
type QueuePosition = string;
export class QueueError extends Error {}
export interface IJobQueuer {
queue: Mapper>>;
}
export class LaminarJobQueuer
implements IJobQueuer>
{
constructor(private readonly queuePositionPrefix: string) {}
private static GetJobTypeTrace = (jobType: string) =>
`LaminarJobQueue.Queue.${jobType}`;
private static JobTypeMetrics = memoize((jobType: string) =>
Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
);
public queue(j: ITraceable) {
const { type: jobType } = j.get();
const trace = LaminarJobQueuer.GetJobTypeTrace(jobType);
const metric = LaminarJobQueuer.JobTypeMetrics(trace);
return j
.bimap(TraceUtil.withTrace(trace))
.bimap(TraceUtil.withMetricTrace(metric))
.map((j) => {
const { type: jobType, arguments: args } = j.get();
const laminarCommand = [
"laminarc",
"queue",
jobType,
...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`),
];
return laminarCommand;
})
.peek((c) =>
c.trace.trace(
`im so excited to see how this queue job will end!! (>ᴗ<): ${c
.get()
.toString()}`,
),
)
.map(getStdout)
.peek(
TraceUtil.promiseify((q) =>
q.trace.trace(
q
.get()
.fold((err, _val) => (err ? metric.failure : metric.success)),
),
),
)
.map(
TraceUtil.promiseify((q) =>
q
.get()
.mapRight((stdout) => {
q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${stdout}`);
const [jobName, jobId] = stdout.split(":");
const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
q.trace.trace(
`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`,
);
return jobUrl;
})
.mapLeft((err) => {
q.trace.addTrace(LogLevel.ERROR).trace(err.toString());
return err;
}),
),
)
.get();
}
}
// -- --