diff options
author | Elizabeth <me@liz.coffee> | 2025-06-02 11:14:52 -0700 |
---|---|---|
committer | Elizabeth <me@liz.coffee> | 2025-06-02 11:14:52 -0700 |
commit | 373d9ec700c0097a22cf665a8e33cf48998d1dc2 (patch) | |
tree | 71297ac69177037929e1bfb00b8c71038058acd5 /server | |
parent | 646c5eb11d3b9240f8434163d103a117d30c88c7 (diff) | |
download | ci-373d9ec700c0097a22cf665a8e33cf48998d1dc2.tar.gz ci-373d9ec700c0097a22cf665a8e33cf48998d1dc2.zip |
Minor things
Diffstat (limited to 'server')
-rw-r--r-- | server/Dockerfile | 8 | ||||
-rw-r--r-- | server/ci.ts | 56 | ||||
-rw-r--r-- | server/deno.json | 4 | ||||
-rw-r--r-- | server/health.ts | 28 | ||||
-rw-r--r-- | server/job.ts | 193 | ||||
-rw-r--r-- | server/mod.ts | 16 |
6 files changed, 305 insertions, 0 deletions
diff --git a/server/Dockerfile b/server/Dockerfile new file mode 100644 index 0000000..c66e1b1 --- /dev/null +++ b/server/Dockerfile @@ -0,0 +1,8 @@ +# -- <ci_server> -- +FROM oci.liz.coffee/emprespresso/ci_base:release AS server + +HEALTHCHECK --interval=10s --retries=3 --start-period=3s \ + CMD [ "curl --fail http://localhost:9000/health" ] + +CMD [ "/app/mod.ts --server --port 9000" ] +# -- </ci_server> -- diff --git a/server/ci.ts b/server/ci.ts new file mode 100644 index 0000000..e1a9ca7 --- /dev/null +++ b/server/ci.ts @@ -0,0 +1,56 @@ +import { + FourOhFourActivityImpl, + getRequiredEnv, + HealthCheckActivityImpl, + type HealthChecker, + type IFourOhFourActivity, + type IHealthCheckActivity, + type ITraceable, + PenguenoRequest, + type ServerTrace, + TraceUtil, +} from "@emprespresso/pengueno"; +import type { Job } from "@emprespresso/ci_model"; +import { + healthCheck as _healthCheck, + type IJobHookActivity, + type IJobQueuer, + JobHookActivityImpl, + LaminarJobQueuer, +} from "@emprespresso/ci_server"; + +export class CiHookServer { + constructor( + healthCheck: HealthChecker = _healthCheck, + jobQueuer: IJobQueuer<ITraceable<Job, ServerTrace>> = new LaminarJobQueuer( + getRequiredEnv("LAMINAR_URL").fold((err, val) => + err ? "https://ci.liz.coffee" : val!, + ), + ), + private readonly healthCheckActivity: IHealthCheckActivity = new HealthCheckActivityImpl( + healthCheck, + ), + private readonly jobHookActivity: IJobHookActivity = new JobHookActivityImpl( + jobQueuer, + ), + private readonly fourOhFourActivity: IFourOhFourActivity = new FourOhFourActivityImpl(), + ) {} + + private route(req: ITraceable<PenguenoRequest, ServerTrace>) { + const url = new URL(req.get().url); + if (url.pathname === "/health") { + return this.healthCheckActivity.checkHealth(req); + } + if (url.pathname === "/job") { + return this.jobHookActivity.processHook(req); + } + return this.fourOhFourActivity.fourOhFour(req); + } + + public serve(req: Request): Promise<Response> { + return PenguenoRequest.from(req) + .bimap(TraceUtil.withClassTrace(this)) + .map((req) => this.route(req)) + .get(); + } +} diff --git a/server/deno.json b/server/deno.json new file mode 100644 index 0000000..c86c9a7 --- /dev/null +++ b/server/deno.json @@ -0,0 +1,4 @@ +{ + "name": "@emprespresso/ci_server", + "exports": "./mod.ts" +} diff --git a/server/health.ts b/server/health.ts new file mode 100644 index 0000000..1acc074 --- /dev/null +++ b/server/health.ts @@ -0,0 +1,28 @@ +import { + getRequiredEnv, + getStdout, + type HealthChecker, + type HealthCheckInput, + HealthCheckOutput, + type IEither, + type ITraceable, + type ServerTrace, + TraceUtil, +} from "@emprespresso/pengueno"; + +export const healthCheck: HealthChecker = ( + input: ITraceable<HealthCheckInput, ServerTrace>, +): Promise<IEither<Error, HealthCheckOutput>> => + input + .bimap(TraceUtil.withFunctionTrace(healthCheck)) + .move(getRequiredEnv("LAMINAR_HOST")) + // ensure LAMINAR_HOST is propagated to getStdout for other procedures + .map((e) => e.get().moveRight(["laminarc", "show-jobs"])) + .map((i) => + i + .get() + .mapRight(i.move.apply) + .flatMapAsync(getStdout.apply) + .then((gotJobs) => gotJobs.moveRight(HealthCheckOutput.YAASSSLAYQUEEN)), + ) + .get(); diff --git a/server/job.ts b/server/job.ts new file mode 100644 index 0000000..62582d6 --- /dev/null +++ b/server/job.ts @@ -0,0 +1,193 @@ +import { + getStdout, + type Mapper, + memoize, + Either, + ErrorSource, + type IActivity, + type IEither, + type ITraceable, + jsonModel, + JsonResponse, + LogLevel, + Metric, + PenguenoError, + type PenguenoRequest, + type ServerTrace, + TraceUtil, + validateExecutionEntries, +} from "@emprespresso/pengueno"; +import { isJob, type Job } from "@emprespresso/ci_model"; + +// -- <job.hook> -- +const wellFormedJobMetric = Metric.fromName("Job.WellFormed"); + +const jobJsonTransformer = ( + j: ITraceable<unknown, ServerTrace>, +): IEither<PenguenoError, Job> => + j + .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric)) + .map((tJson) => { + if (!isJob(tJson) || !validateExecutionEntries(tJson)) { + const err = "seems like a pwetty mawfomed job \\(-.-)/"; + tJson.trace.addTrace(LogLevel.WARN).trace(err); + return Either.left<PenguenoError, Job>(new PenguenoError(err, 400)); + } + return Either.right<PenguenoError, Job>(tJson); + }) + .peek((tJob) => + tJob.trace.trace( + tJob + .get() + .fold((err) => + err ? wellFormedJobMetric.failure : wellFormedJobMetric.success, + ), + ), + ) + .get(); + +export interface IJobHookActivity { + processHook: IActivity; +} + +const jobHookRequestMetric = Metric.fromName("JobHook.process"); +export class JobHookActivityImpl implements IJobHookActivity { + constructor( + private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>, + ) {} + + private trace(r: ITraceable<PenguenoRequest, ServerTrace>) { + return r + .bimap(TraceUtil.withClassTrace(this)) + .bimap(TraceUtil.withMetricTrace(jobHookRequestMetric)); + } + + public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) { + return this.trace(r) + .map(jsonModel(jobJsonTransformer)) + .map(async (tEitherJobJson) => { + const eitherJob = await tEitherJobJson.get(); + return eitherJob.flatMapAsync(async (job) => { + const eitherQueued = await tEitherJobJson + .move(job) + .map(this.queuer.queue) + .get(); + return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500)); + }); + }) + .peek( + TraceUtil.promiseify((tJob) => + tJob + .get() + .fold( + (err: PenguenoError | undefined, _val: string | undefined) => { + if (!err) { + tJob.trace.trace(jobHookRequestMetric.success); + tJob.trace.trace( + `all queued up and weady to go :D !! ${_val}`, + ); + return; + } + tJob.trace.trace( + err.source === ErrorSource.SYSTEM + ? jobHookRequestMetric.failure + : jobHookRequestMetric.warn, + ); + tJob.trace.addTrace(err.source).trace(`${err}`); + }, + ), + ), + ) + .map( + TraceUtil.promiseify( + (tEitherQueuedJob) => + new JsonResponse(r, tEitherQueuedJob.get(), { + status: tEitherQueuedJob + .get() + .fold((err, _val) => (_val ? 200 : err?.status ?? 500)), + }), + ), + ) + .get(); + } +} + +// -- </job.hook> -- + +// -- <job.queuer> -- +type QueuePosition = string; +export class QueueError extends Error {} +export interface IJobQueuer<TJob> { + queue: Mapper<TJob, Promise<IEither<QueueError, QueuePosition>>>; +} + +export class LaminarJobQueuer + implements IJobQueuer<ITraceable<Job, ServerTrace>> +{ + constructor(private readonly queuePositionPrefix: string) {} + + private static GetJobTypeTrace = (jobType: string) => + `LaminarJobQueue.Queue.${jobType}`; + private static JobTypeMetrics = memoize((jobType: string) => + Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)), + ); + + public queue(j: ITraceable<Job, ServerTrace>) { + const { type: jobType } = j.get(); + const trace = LaminarJobQueuer.GetJobTypeTrace(jobType); + const metric = LaminarJobQueuer.JobTypeMetrics(trace); + + return j + .bimap(TraceUtil.withTrace(trace)) + .bimap(TraceUtil.withMetricTrace(metric)) + .map((j) => { + const { type: jobType, arguments: args } = j.get(); + const laminarCommand = [ + "laminarc", + "queue", + jobType, + ...Object.entries(args).map(([key, val]) => `"${key}"="${val}"`), + ]; + return laminarCommand; + }) + .peek((c) => + c.trace.trace( + `im so excited to see how this queue job will end!! (>ᴗ<): ${c + .get() + .toString()}`, + ), + ) + .map(getStdout) + .peek( + TraceUtil.promiseify((q) => + q.trace.trace( + q + .get() + .fold((err, _val) => (err ? metric.failure : metric.success)), + ), + ), + ) + .map( + TraceUtil.promiseify((q) => + q + .get() + .mapRight((stdout) => { + q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${stdout}`); + const [jobName, jobId] = stdout.split(":"); + const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; + + q.trace.trace( + `all queued up and weady to go~ (˘ω˘) => ${jobUrl}`, + ); + return jobUrl; + }) + .mapLeft((err) => { + q.trace.addTrace(LogLevel.ERROR).trace(err.toString()); + return err; + }), + ), + ) + .get(); + } +} +// -- </job.queuer> -- diff --git a/server/mod.ts b/server/mod.ts new file mode 100644 index 0000000..9dc57aa --- /dev/null +++ b/server/mod.ts @@ -0,0 +1,16 @@ +#!/usr/bin/env -S deno run --allow-env --allow-net --allow-run + +export * from "./ci.ts"; +export * from "./health.ts"; +export * from "./job.ts"; + +import { CiHookServer } from "./mod.ts"; +const server = new CiHookServer(); + +export const runServer = (port: number, host: string) => { + const serverConfig = { + host, + port, + }; + return Deno.serve(serverConfig, (req) => server.serve(req)).finished; +}; |