summaryrefslogtreecommitdiff
path: root/worker/executor
diff options
context:
space:
mode:
Diffstat (limited to 'worker/executor')
-rw-r--r--worker/executor/job.ts42
-rw-r--r--worker/executor/mod.ts2
-rw-r--r--worker/executor/pipeline.ts53
3 files changed, 97 insertions, 0 deletions
diff --git a/worker/executor/job.ts b/worker/executor/job.ts
new file mode 100644
index 0000000..76f0e0c
--- /dev/null
+++ b/worker/executor/job.ts
@@ -0,0 +1,42 @@
+import {
+ getStdout,
+ type ITraceable,
+ LogLevel,
+ type LogMetricTraceSupplier,
+ memoize,
+ Metric,
+ TraceUtil,
+ validateExecutionEntries,
+} from "@emprespresso/pengueno";
+import type { Job } from "@emprespresso/ci-model";
+
+const jobTypeMetric = memoize((type: string) => Metric.fromName(`run.${type}`));
+export const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) =>
+ tJob.bimap(TraceUtil.withMetricTrace(jobTypeMetric(tJob.get().type)))
+ .peek((tJob) =>
+ tJob.trace.trace(
+ `let's do this little job ok!! ${tJob.get()}`,
+ )
+ )
+ .map((tJob) =>
+ validateExecutionEntries(tJob.get().arguments)
+ .mapLeft((badEntries) => {
+ tJob.trace.addTrace(LogLevel.ERROR).trace(
+ badEntries.toString(),
+ );
+ return new Error("invalid job arguments");
+ })
+ .flatMapAsync((args) =>
+ getStdout(tJob.move(tJob.get().type), { env: args })
+ )
+ )
+ .peek(
+ TraceUtil.promiseify((q) =>
+ q.trace.trace(
+ q.get().fold((err, _val) =>
+ jobTypeMetric(tJob.get().type)[err ? "failure" : "success"]
+ ),
+ )
+ ),
+ )
+ .get();
diff --git a/worker/executor/mod.ts b/worker/executor/mod.ts
new file mode 100644
index 0000000..944ab7d
--- /dev/null
+++ b/worker/executor/mod.ts
@@ -0,0 +1,2 @@
+export * from "./job.ts";
+export * from "./pipeline.ts";
diff --git a/worker/executor/pipeline.ts b/worker/executor/pipeline.ts
new file mode 100644
index 0000000..a1aa7c3
--- /dev/null
+++ b/worker/executor/pipeline.ts
@@ -0,0 +1,53 @@
+import {
+ Either,
+ type IEither,
+ type ITraceable,
+ type LogMetricTraceSupplier,
+ Metric,
+ TraceUtil,
+} from "@emprespresso/pengueno";
+import type { Job, JobArgT, Pipeline } from "@emprespresso/ci-model";
+import { executeJob } from "./mod.ts";
+
+const pipelinesMetric = Metric.fromName("pipelines");
+export const executePipeline = (
+ tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>,
+ baseEnv?: JobArgT,
+): Promise<IEither<Error, void>> =>
+ tPipeline.bimap(TraceUtil.withFunctionTrace(executePipeline))
+ .bimap(TraceUtil.withMetricTrace(pipelinesMetric))
+ .map(async (tJobs): Promise<IEither<Error, void>> => {
+ for (const [i, serialStage] of tJobs.get().serialJobs.entries()) {
+ tJobs.trace.trace(
+ `executing stage ${i}. do your best little stage :>\n${serialStage}`,
+ );
+ const jobResults = await Promise.all(
+ serialStage.parallelJobs.map((job) =>
+ tJobs.bimap((_) => [job, `stage ${i}`])
+ .map((tJob) =>
+ <Job> ({
+ ...tJob.get(),
+ arguments: { ...baseEnv, ...tJob.get().arguments },
+ })
+ )
+ .map(executeJob)
+ .peek(
+ TraceUtil.promiseify((tEitherJobOutput) =>
+ tEitherJobOutput.get().mapRight((stdout) =>
+ tEitherJobOutput.trace.addTrace("STDOUT").trace(stdout)
+ )
+ ),
+ )
+ .get()
+ ),
+ );
+ const failures = jobResults.filter((e) => e.fold((err) => !!err));
+ if (failures.length > 0) {
+ tJobs.trace.trace(pipelinesMetric.failure);
+ return Either.left(new Error(failures.toString()));
+ }
+ }
+ tJobs.trace.trace(pipelinesMetric.success);
+ return Either.right(undefined);
+ })
+ .get();