summaryrefslogtreecommitdiff
path: root/server/job
diff options
context:
space:
mode:
Diffstat (limited to 'server/job')
-rw-r--r--server/job/index.ts4
-rw-r--r--server/job/queue.ts40
-rw-r--r--server/job/run_activity.ts55
3 files changed, 50 insertions, 49 deletions
diff --git a/server/job/index.ts b/server/job/index.ts
index ecf0984..92c4682 100644
--- a/server/job/index.ts
+++ b/server/job/index.ts
@@ -1,2 +1,2 @@
-export * from './queue';
-export * from './run_activity';
+export * from './queue.js';
+export * from './run_activity.js';
diff --git a/server/job/queue.ts b/server/job/queue.ts
index 2392222..4b21186 100644
--- a/server/job/queue.ts
+++ b/server/job/queue.ts
@@ -2,13 +2,13 @@ import {
getStdout,
type Mapper,
memoize,
- Either,
type IEither,
type ITraceable,
LogLevel,
Metric,
type ServerTrace,
TraceUtil,
+ PenguenoError,
} from '@emprespresso/pengueno';
import { type Job } from '@emprespresso/ci_model';
@@ -23,7 +23,7 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`;
private static JobTypeMetrics = memoize((jobType: string) =>
- Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
+ Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)).asResult(),
);
public queue(j: ITraceable<Job, ServerTrace>) {
@@ -32,8 +32,8 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
const metric = LaminarJobQueuer.JobTypeMetrics(jobType);
return j
- .bimap(TraceUtil.withTrace(trace))
- .bimap(TraceUtil.withMetricTrace(metric))
+ .flatMap(TraceUtil.withTrace(trace))
+ .flatMap(TraceUtil.withMetricTrace(metric))
.map((j) => {
const { type: jobType, arguments: args } = j.get();
const laminarCommand = [
@@ -47,26 +47,24 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
.peek((c) =>
c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`),
)
- .map(getStdout)
- .peek(
- TraceUtil.promiseify((q) =>
- q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))),
- ),
- )
+ .map((c) => getStdout(c))
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric)))
.map(
TraceUtil.promiseify((q) =>
- q.get().fold(({ isLeft, value }) => {
- if (isLeft) {
- q.trace.addTrace(LogLevel.ERROR).trace(value.toString());
- return Either.left<Error, string>(value);
- }
- q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`);
- const [jobName, jobId] = value.split(':');
- const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
+ q.get().mapBoth(
+ (err) => {
+ q.trace.traceScope(LogLevel.ERROR).trace(err);
+ return err;
+ },
+ (ok) => {
+ q.trace.traceScope(LogLevel.DEBUG).trace(`stdout ${ok}`);
+ const [jobName, jobId] = ok.split(':');
+ const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
- q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
- return Either.right<Error, string>(jobUrl);
- }),
+ q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
+ return jobUrl;
+ },
+ ),
),
)
.get();
diff --git a/server/job/run_activity.ts b/server/job/run_activity.ts
index 9f25cf8..22bc4c7 100644
--- a/server/job/run_activity.ts
+++ b/server/job/run_activity.ts
@@ -7,7 +7,9 @@ import {
jsonModel,
JsonResponse,
LogLevel,
+ LogMetricTraceSupplier,
Metric,
+ MetricsTraceSupplier,
PenguenoError,
type PenguenoRequest,
type ServerTrace,
@@ -15,39 +17,35 @@ import {
validateExecutionEntries,
} from '@emprespresso/pengueno';
import { isJob, type Job } from '@emprespresso/ci_model';
-import { IJobQueuer } from './queue';
+import { IJobQueuer } from './queue.js';
-const wellFormedJobMetric = Metric.fromName('Job.WellFormed');
+const wellFormedJobMetric = Metric.fromName('Job.WellFormed').asResult();
const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, Job> =>
j
- .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric))
+ .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric))
.map((tJson): IEither<PenguenoError, Job> => {
const tJob = tJson.get();
if (!isJob(tJob) || !validateExecutionEntries(tJob)) {
const err = 'seems like a pwetty mawfomed job (-.-)';
- tJson.trace.addTrace(LogLevel.WARN).trace(err);
+ tJson.trace.traceScope(LogLevel.WARN).trace(err);
return Either.left(new PenguenoError(err, 400));
}
return Either.right(tJob);
})
- .peek((tJob) =>
- tJob.trace.trace(
- tJob.get().fold(({ isLeft }) => (isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success)),
- ),
- )
+ .peek(TraceUtil.traceResultingEither(wellFormedJobMetric))
.get();
export interface IJobHookActivity {
processHook: IActivity;
}
-const jobHookRequestMetric = Metric.fromName('JobHook.process');
+const jobHookRequestMetric = Metric.fromName('JobHook.process').asResult();
export class JobHookActivityImpl implements IJobHookActivity {
constructor(private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>) {}
private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
- return r.bimap(TraceUtil.withClassTrace(this)).bimap(TraceUtil.withMetricTrace(jobHookRequestMetric));
+ return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(jobHookRequestMetric));
}
public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) {
@@ -63,29 +61,34 @@ export class JobHookActivityImpl implements IJobHookActivity {
return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500));
});
})
+ .flatMapAsync(
+ TraceUtil.promiseify((tEitherQueued) => {
+ const errorSource = tEitherQueued
+ .get()
+ .left()
+ .map(({ source }) => source)
+ .orSome(() => ErrorSource.SYSTEM)
+ .get();
+ const shouldWarn = errorSource === ErrorSource.USER;
+ return TraceUtil.traceResultingEither<PenguenoError, string, LogMetricTraceSupplier>(
+ jobHookRequestMetric,
+ shouldWarn,
+ )(tEitherQueued);
+ }),
+ )
.peek(
TraceUtil.promiseify((tJob) =>
- tJob.get().fold(({ isRight, value }) => {
- if (isRight) {
- tJob.trace.trace(jobHookRequestMetric.success);
- tJob.trace.trace(`all queued up and weady to go :D !! ${value}`);
- return;
- }
-
- tJob.trace.trace(
- value.source === ErrorSource.SYSTEM
- ? jobHookRequestMetric.failure
- : jobHookRequestMetric.warn,
- );
- tJob.trace.addTrace(value.source).trace(`${value}`);
- }),
+ tJob.get().mapRight((job) => tJob.trace.trace(`all queued up and weady to go :D !! ${job}`)),
),
)
.map(
TraceUtil.promiseify(
(tEitherQueuedJob) =>
new JsonResponse(r, tEitherQueuedJob.get(), {
- status: tEitherQueuedJob.get().fold(({ isRight, value }) => (isRight ? 200 : value.status)),
+ status: tEitherQueuedJob.get().fold(
+ ({ status }) => status,
+ () => 200,
+ ),
}),
),
)