summaryrefslogtreecommitdiff
path: root/worker/executor/job.ts
diff options
context:
space:
mode:
Diffstat (limited to 'worker/executor/job.ts')
-rw-r--r--worker/executor/job.ts42
1 files changed, 42 insertions, 0 deletions
diff --git a/worker/executor/job.ts b/worker/executor/job.ts
new file mode 100644
index 0000000..76f0e0c
--- /dev/null
+++ b/worker/executor/job.ts
@@ -0,0 +1,42 @@
+import {
+ getStdout,
+ type ITraceable,
+ LogLevel,
+ type LogMetricTraceSupplier,
+ memoize,
+ Metric,
+ TraceUtil,
+ validateExecutionEntries,
+} from "@emprespresso/pengueno";
+import type { Job } from "@emprespresso/ci-model";
+
+const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`));
+export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) =>
+ tJob.bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
+ .peek((tJob) =>
+ tJob.trace.trace(
+ `let's do this little job ok!! ${tJob.get()}`,
+ )
+ )
+ .map((tJob) =>
+ validateExecutionEntries(tJob.get().arguments)
+ .mapLeft((badEntries) => {
+ tJob.trace.addTrace(LogLevel.ERROR).trace(
+ badEntries.toString(),
+ );
+ return new Error("invalid job arguments");
+ })
+ .flatMapAsync((args) =>
+ getStdout(tJob.move(tJob.get().type), { env: args })
+ )
+ )
+ .peek(
+ TraceUtil.promiseify((q) =>
+ q.trace.trace(
+ q.get().fold((err, _val) =>
+ jobTypeMetric(tJob.get().type)[err ? "failure" : "success"]
+ ),
+ )
+ ),
+ )
+ .get();