diff options
Diffstat (limited to 'worker')
-rw-r--r-- | worker/deno.json | 2 | ||||
-rw-r--r-- | worker/jobs/checkout_ci.run.ts | 231 | ||||
-rwxr-xr-x | worker/scripts/ansible_playbook | 12 | ||||
-rwxr-xr-x | worker/scripts/run_pipeline | 51 |
4 files changed, 235 insertions, 61 deletions
diff --git a/worker/deno.json b/worker/deno.json index 5636d0a..90f50c9 100644 --- a/worker/deno.json +++ b/worker/deno.json @@ -1,4 +1,4 @@ { - "name": "@liz-ci/worker", + "name": "@emprespresso/ci-worker", "exports": "./mod.ts" } diff --git a/worker/jobs/checkout_ci.run.ts b/worker/jobs/checkout_ci.run.ts new file mode 100644 index 0000000..948a4eb --- /dev/null +++ b/worker/jobs/checkout_ci.run.ts @@ -0,0 +1,231 @@ +import { + type Command, + Either, + getRequiredEnvVars, + getStdout, + type IEither, + isObject, + type ITraceable, + LogLevel, + LogMetricTraceable, + type LogMetricTraceSupplier, + memoize, + Metric, + prependWith, + TraceUtil, + validateExecutionEntries, +} from "@emprespresso/pengueno"; +import { + type CheckoutCiJob, + type Job, + type Pipeline, + PipelineImpl, + type PipelineStage, +} from "@emprespresso/ci-model"; + +export interface CiWorkflow { + workflow: string; +} +export const isCiWorkflow = (t: unknown): t is CiWorkflow => + isObject(t) && "workflow" in t && typeof t.workflow === "string" && + !t.workflow.includes(".."); + +const run = Date.now().toString(); +const trace = `checkout_ci.${run}`; +const eitherJob = getRequiredEnvVars(["remote", "refname", "rev"]).mapRight(( + baseArgs, +) => ({ + type: "checkout_ci", + arguments: { + ...baseArgs, + run, + return: Deno.cwd(), + }, +} as unknown as CheckoutCiJob)); + +const ciRunMetric = Metric.fromName("checkout_ci.run"); +// TODO: FIGURE OUT HOW TO SETUP CLEANUP JOB! Maybe a .onfinally()? +await LogMetricTraceable.from(eitherJob) + .bimap(TraceUtil.withTrace(trace)) + .bimap(TraceUtil.withMetricTrace(ciRunMetric)) + .peek((tEitherJob) => + tEitherJob.trace.trace( + `hewwo~ starting checkout job for ${tEitherJob.get()}`, + ) + ) + .map((tEitherJob) => + tEitherJob.get().flatMapAsync((job) => { + const wd = getWorkingDirectoryForCheckoutJob(job); + return Either.fromFailableAsync<Error, CheckoutCiJob>( + Deno.mkdir(wd).then(() => Deno.chdir(wd)).then(() => job), + ); + }) + ) + .map((tEitherJob) => + tEitherJob.get().then((eitherJob) => + eitherJob.flatMapAsync((job) => + getStdout(tEitherJob.move("fetch_code"), { + env: { + remote: job.arguments.remote, + checkout: job.arguments.rev, + path: getSrcDirectoryForCheckoutJob(job), + }, + }).then((e) => e.moveRight(job)) + ) + ) + ) + .map((tEitherJob) => + tEitherJob.get().then((eitherJob) => + eitherJob.flatMapAsync<Command>((job) => + Either.fromFailableAsync<Error, string>( + Deno.readTextFile( + `${getSrcDirectoryForCheckoutJob(job)}/.ci/ci.json`, + ), + ).then((eitherWorkflowJson) => + eitherWorkflowJson.flatMap( + (json) => Either.fromFailable<Error, unknown>(JSON.parse(json)), + ).flatMap((eitherWorkflowParse) => { + if (isCiWorkflow(eitherWorkflowParse)) { + return Either.right( + getPipelineGenerationCommand(job, eitherWorkflowParse.workflow), + ); + } + return Either.left( + new Error( + "couldn't find any valid ci configuration (。•́︿•̀。), that's okay~", + ), + ); + }) + ) + ) + ) + ) + .map((tEitherPipelineGenerationCommand) => + tEitherPipelineGenerationCommand.get().then(( + eitherPipelineGenerationCommand, + ) => + eitherPipelineGenerationCommand.flatMapAsync((command) => + tEitherPipelineGenerationCommand.move(command).map(getStdout).get() + ) + ) + ) + .map( + TraceUtil.promiseify((tEitherPipelineString) => + tEitherPipelineString.get().flatMap(PipelineImpl.from) + ), + ) + .peek( + TraceUtil.promiseify((tEitherPipeline) => + tEitherPipeline.get().mapRight((val) => + tEitherPipeline.trace.trace( + "built the pipeline~ (◕ᴗ◕✿) let's make something amazing! " + + val.serialize(), + ) + ) + ), + ) + .map( + TraceUtil.promiseify((tEitherPipeline) => + tEitherPipeline.get() + .mapRight((pipeline) => tEitherPipeline.move(pipeline)) + .mapRight(executePipeline) + ), + ) + .get(); + +const jobTypeMetric = memoize((type: string) => + Metric.fromName(`checkout_ci.run.${type}`) +); +const executeJob = (tJob: ITraceable<Job, LogMetricTraceSupplier>) => { + const jobType = tJob.get().type; + const metric = jobTypeMetric(jobType); + return tJob.bimap(TraceUtil.withMetricTrace(metric)) + .peek((tJob) => + tJob.trace.trace( + `let's do this little job ok!! ${tJob.get()}`, + ) + ) + .map((tJob) => + validateExecutionEntries(tJob.get().arguments) + .mapLeft((badEntries) => { + tJob.trace.addTrace(LogLevel.ERROR).trace( + badEntries.toString(), + ); + return new Error("invalid job arguments"); + }) + .flatMapAsync((args) => + getStdout(tJob.move(tJob.get().type), { env: args }) + ) + ) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace( + q.get().fold((err, _val) => + err + ? jobTypeMetric(tJob.get().type).failure + : jobTypeMetric(tJob.get().type).success + ), + ) + ), + ) + .get(); +}; + +const pipelinesMetric = Metric.fromName("checkout_ci.pipelines"); +const executePipeline = ( + tPipeline: ITraceable<Pipeline, LogMetricTraceSupplier>, +): Promise<IEither<Error, Array<PipelineStage>>> => + tPipeline.bimap(TraceUtil.withFunctionTrace(executePipeline)) + .bimap(TraceUtil.withMetricTrace(pipelinesMetric)) + .map((pipeline) => pipeline.get().serialJobs) + .map(async (tJobs) => { + for (const stage of tJobs.get()) { + tJobs.trace.trace( + `executing stage. do your best little stage :> ${stage}`, + ); + const results = await Promise.all( + stage.parallelJobs.map((job) => + tJobs.move(job).map(executeJob).get() + ), + ); + const failures = results.filter((e) => e.fold((_err, val) => !!val)); + if (failures.length > 0) { + tJobs.trace.trace(pipelinesMetric.failure); + return Either.left<Error, Array<PipelineStage>>( + new Error(failures.join(",")), + ); + } + } + tJobs.trace.trace(pipelinesMetric.success); + return Either.right<Error, Array<PipelineStage>>(tJobs.get()); + }) + .get(); + +const getWorkingDirectoryForCheckoutJob = (job: CheckoutCiJob) => + `${job.arguments.returnPath}/${job.arguments.run}`; + +const getSrcDirectoryForCheckoutJob = (job: CheckoutCiJob) => + `${job.arguments.returnPath}/${job.arguments.run}`; + +const getPipelineGenerationCommand = ( + job: CheckoutCiJob, + pipelineGeneratorPath: string, + runFlags = + "--rm --network none --cap-drop ALL --security-opt no-new-privileges".split( + " ", + ), +) => [ + "docker", + "run", + ...runFlags, + ...prependWith( + Object.entries(job.arguments).map(([key, val]) => `"${key}"="${val}"`), + "-e", + ), + "-v", + `${ + getSrcDirectoryForCheckoutJob(job) + }/${pipelineGeneratorPath}:/pipeline_generator`, + "oci.liz.coffee/img/liz-ci:release", + "/pipeline_generator", +]; diff --git a/worker/scripts/ansible_playbook b/worker/scripts/ansible_playbook index d24cbb6..096bb7b 100755 --- a/worker/scripts/ansible_playbook +++ b/worker/scripts/ansible_playbook @@ -1,14 +1,8 @@ #!/usr/bin/env -S deno run --allow-env --allow-net --allow-run --allow-read --allow-write -import { - BitwardenSession, - getRequiredEnv, - getStdout, - loggerWithPrefix, - prependWith, - type SecureNote, -} from "@liz-ci/utils"; -import type { AnsiblePlaybookJobProps } from "@liz-ci/model"; +import { getRequiredEnv, getStdout, prependWith } from "@emprespresso/pengueno"; +import { BitwardenSession, type SecureNote } from "@emprespresso/ci-utils"; +import type { AnsiblePlaybookJobProps } from "@emprespresso/ci-model"; const args: AnsiblePlaybookJobProps = { path: getRequiredEnv("path"), diff --git a/worker/scripts/run_pipeline b/worker/scripts/run_pipeline deleted file mode 100755 index abb13b3..0000000 --- a/worker/scripts/run_pipeline +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env -S deno run --allow-env --allow-net --allow-run --allow-read --allow-write - -import { PipelineImpl } from "@liz-ci/model"; -import { - getRequiredEnv, - getStdout, - invalidExecutionEntriesOf, - loggerWithPrefix, -} from "@liz-ci/utils"; - -const pipelinePath = getRequiredEnv("pipeline"); -const logger = loggerWithPrefix(() => - `[${new Date().toISOString()}] [run_pipeline.${pipelinePath}]` -); - -const run = async () => { - logger.log("starting pipeline execution~ time to work hard!"); - - const stages = await (Deno.readTextFile(pipelinePath)) - .then(PipelineImpl.from) - .then((pipeline) => pipeline.getStages()); - - for (const stage of stages) { - logger.log("executing stage. do your best little stage :>", stage); - - await Promise.all( - stage.parallelJobs.map(async (job, jobIdx) => { - logger.log(`let's do this little job ok!! ${jobIdx}`, job); - const invalidArgs = invalidExecutionEntriesOf(job.arguments); - if (invalidArgs.length) { - logger.error(`oh nooes`, invalidArgs); - throw new Error("invalid job arguments"); - } - - const result = await getStdout(job.type, { env: job.arguments }); - logger.log(jobIdx, "brought something to you! look :D", { result }); - }), - ); - } - - logger.log("all done! everything worked! yay~ (⑅˘꒳˘)"); -}; - -if (import.meta.main) { - try { - await run(); - } catch (e) { - logger.error("womp womp D:", e); - throw e; - } -} |