summaryrefslogtreecommitdiff
path: root/worker/executor.ts
diff options
context:
space:
mode:
authorElizabeth <me@liz.coffee>2025-06-02 11:14:52 -0700
committerElizabeth <me@liz.coffee>2025-06-02 11:14:52 -0700
commit373d9ec700c0097a22cf665a8e33cf48998d1dc2 (patch)
tree71297ac69177037929e1bfb00b8c71038058acd5 /worker/executor.ts
parent646c5eb11d3b9240f8434163d103a117d30c88c7 (diff)
downloadci-373d9ec700c0097a22cf665a8e33cf48998d1dc2.tar.gz
ci-373d9ec700c0097a22cf665a8e33cf48998d1dc2.zip
Minor things
Diffstat (limited to 'worker/executor.ts')
-rw-r--r--worker/executor.ts99
1 files changed, 99 insertions, 0 deletions
diff --git a/worker/executor.ts b/worker/executor.ts
new file mode 100644
index 0000000..faa40a6
--- /dev/null
+++ b/worker/executor.ts
@@ -0,0 +1,99 @@
+import {
+ getStdout,
+ type ITraceable,
+ LogLevel,
+ type LogMetricTraceSupplier,
+ memoize,
+ Metric,
+ TraceUtil,
+ validateExecutionEntries,
+ Either,
+ type IEither,
+} from "@emprespresso/pengueno";
+import type { Job, JobArgT, Pipeline } from "@emprespresso/ci_model";
+
+// -- <job.exectuor> --
+const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`));
+export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) =>
+ tJob
+ .bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
+ .peek((tJob) =>
+ tJob.trace.trace(`let's do this little job ok!! ${tJob.get()}`),
+ )
+ .map((tJob) =>
+ validateExecutionEntries(tJob.get().arguments)
+ .mapLeft((badEntries) => {
+ tJob.trace.addTrace(LogLevel.ERROR).trace(badEntries.toString());
+ return new Error("invalid job arguments");
+ })
+ .flatMapAsync((args) =>
+ getStdout(tJob.move(tJob.get().type), { env: args }),
+ ),
+ )
+ .peek(
+ TraceUtil.promiseify((q) =>
+ q.trace.trace(
+ q
+ .get()
+ .fold(
+ (err, _val) =>
+ jobTypeMetric(tJob.get().type)[err ? "failure" : "success"],
+ ),
+ ),
+ ),
+ )
+ .get();
+// -- </job.exectuor> --
+
+// -- <pipeline.executor> --
+const pipelinesMetric = Metric.fromName("pipelines");
+export const executePipeline = (
+ tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
+ baseEnv?: JobArgT,
+): Promise<IEither<Error, void>> =>
+ tPipeline
+ .bimap(TraceUtil.withFunctionTrace(executePipeline))
+ .bimap(TraceUtil.withMetricTrace(pipelinesMetric))
+ .map(async (tJobs): Promise<IEither<Error, void>> => {
+ for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
+ tJobs.trace.trace(
+ `executing stage ${i}. do your best little stage :>\n${serialStage}`,
+ );
+ const jobResults = await Promise.all(
+ serialStage.parallelJobs.map((job) =>
+ tJobs
+ .bimap((_) => [job, `stage ${i}`])
+ .map(
+ (tJob) =>
+ <Job>{
+ ...tJob.get(),
+ arguments: {
+ ...baseEnv,
+ ...tJob.get().arguments,
+ },
+ },
+ )
+ .map(executeJob)
+ .peek(
+ TraceUtil.promiseify((tEitherJobOutput) =>
+ tEitherJobOutput
+ .get()
+ .mapRight((stdout) =>
+ tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout),
+ ),
+ ),
+ )
+ .get(),
+ ),
+ );
+ const failures = jobResults.filter((e) => e.fold((err) => !!err));
+ if (failures.length > 0) {
+ tJobs.trace.trace(pipelinesMetric.failure);
+ return Either.left(new Error(failures.toString()));
+ }
+ }
+ tJobs.trace.trace(pipelinesMetric.success);
+ return Either.right(undefined);
+ })
+ .get();
+// -- </pipeline.executor> --