summaryrefslogtreecommitdiff
path: root/server/job/queue.ts
diff options
context:
space:
mode:
authorElizabeth Hunt <me@liz.coffee>2025-06-29 17:31:30 -0700
committerElizabeth Hunt <me@liz.coffee>2025-06-29 17:31:30 -0700
commit58be1809c46cbe517a18d86d0af52179dcc5cbf6 (patch)
tree9ccc678b3fd48c1a52fe501600dd2c2051740a55 /server/job/queue.ts
parentd4791f3d357634daf506fb8f91cc5332a794c421 (diff)
downloadci-58be1809c46cbe517a18d86d0af52179dcc5cbf6.tar.gz
ci-58be1809c46cbe517a18d86d0af52179dcc5cbf6.zip
Move to nodejs and also lots of significant refactoring that should've been broken up but idgaf
Diffstat (limited to 'server/job/queue.ts')
-rw-r--r--server/job/queue.ts40
1 files changed, 19 insertions, 21 deletions
diff --git a/server/job/queue.ts b/server/job/queue.ts
index 2392222..4b21186 100644
--- a/server/job/queue.ts
+++ b/server/job/queue.ts
@@ -2,13 +2,13 @@ import {
getStdout,
type Mapper,
memoize,
- Either,
type IEither,
type ITraceable,
LogLevel,
Metric,
type ServerTrace,
TraceUtil,
+ PenguenoError,
} from '@emprespresso/pengueno';
import { type Job } from '@emprespresso/ci_model';
@@ -23,7 +23,7 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`;
private static JobTypeMetrics = memoize((jobType: string) =>
- Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
+ Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)).asResult(),
);
public queue(j: ITraceable<Job, ServerTrace>) {
@@ -32,8 +32,8 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
const metric = LaminarJobQueuer.JobTypeMetrics(jobType);
return j
- .bimap(TraceUtil.withTrace(trace))
- .bimap(TraceUtil.withMetricTrace(metric))
+ .flatMap(TraceUtil.withTrace(trace))
+ .flatMap(TraceUtil.withMetricTrace(metric))
.map((j) => {
const { type: jobType, arguments: args } = j.get();
const laminarCommand = [
@@ -47,26 +47,24 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
.peek((c) =>
c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`),
)
- .map(getStdout)
- .peek(
- TraceUtil.promiseify((q) =>
- q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))),
- ),
- )
+ .map((c) => getStdout(c))
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric)))
.map(
TraceUtil.promiseify((q) =>
- q.get().fold(({ isLeft, value }) => {
- if (isLeft) {
- q.trace.addTrace(LogLevel.ERROR).trace(value.toString());
- return Either.left<Error, string>(value);
- }
- q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`);
- const [jobName, jobId] = value.split(':');
- const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
+ q.get().mapBoth(
+ (err) => {
+ q.trace.traceScope(LogLevel.ERROR).trace(err);
+ return err;
+ },
+ (ok) => {
+ q.trace.traceScope(LogLevel.DEBUG).trace(`stdout ${ok}`);
+ const [jobName, jobId] = ok.split(':');
+ const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
- q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
- return Either.right<Error, string>(jobUrl);
- }),
+ q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
+ return jobUrl;
+ },
+ ),
),
)
.get();