diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/ci.ts | 30 | ||||
-rw-r--r-- | server/health.ts | 2 | ||||
-rw-r--r-- | server/hono_proxy.ts | 71 | ||||
-rw-r--r-- | server/index.ts | 32 | ||||
-rw-r--r-- | server/job/index.ts | 4 | ||||
-rw-r--r-- | server/job/queue.ts | 40 | ||||
-rw-r--r-- | server/job/run_activity.ts | 55 |
7 files changed, 144 insertions, 90 deletions
diff --git a/server/ci.ts b/server/ci.ts index f57c426..c8aa6a1 100644 --- a/server/ci.ts +++ b/server/ci.ts @@ -1,34 +1,41 @@ import { FourOhFourActivityImpl, - getRequiredEnv, + getEnv, HealthCheckActivityImpl, type HealthChecker, type IFourOhFourActivity, type IHealthCheckActivity, type ITraceable, PenguenoRequest, + Server, type ServerTrace, - TraceUtil, } from '@emprespresso/pengueno'; import type { Job } from '@emprespresso/ci_model'; -import { type IJobHookActivity, type IJobQueuer, JobHookActivityImpl, LaminarJobQueuer } from './job'; -import { healthCheck as _healthCheck } from '.'; +import { + healthCheck as _healthCheck, + type IJobHookActivity, + type IJobQueuer, + JobHookActivityImpl, + LaminarJobQueuer, +} from '@emprespresso/ci_server'; export const DEFAULT_CI_SERVER = 'https://ci.liz.coffee'; -export class CiHookServer { +export class CiHookServer implements Server { constructor( healthCheck: HealthChecker = _healthCheck, jobQueuer: IJobQueuer<ITraceable<Job, ServerTrace>> = new LaminarJobQueuer( - getRequiredEnv('LAMINAR_URL').fold(({ isLeft, value }) => (isLeft ? DEFAULT_CI_SERVER : value)), + getEnv('LAMINAR_URL') + .orSome(() => DEFAULT_CI_SERVER) + .get(), ), private readonly healthCheckActivity: IHealthCheckActivity = new HealthCheckActivityImpl(healthCheck), private readonly jobHookActivity: IJobHookActivity = new JobHookActivityImpl(jobQueuer), private readonly fourOhFourActivity: IFourOhFourActivity = new FourOhFourActivityImpl(), ) {} - private route(req: ITraceable<PenguenoRequest, ServerTrace>) { - const url = new URL(req.get().url); + public serve(req: ITraceable<PenguenoRequest, ServerTrace>) { + const url = new URL(req.get().req.url); if (url.pathname === '/health') { return this.healthCheckActivity.checkHealth(req); } @@ -37,11 +44,4 @@ export class CiHookServer { } return this.fourOhFourActivity.fourOhFour(req); } - - public serve(req: Request): Promise<Response> { - return PenguenoRequest.from(req) - .bimap(TraceUtil.withClassTrace(this)) - .map((req) => this.route(req)) - .get(); - } } diff --git a/server/health.ts b/server/health.ts index 8435865..6a1f77f 100644 --- a/server/health.ts +++ b/server/health.ts @@ -14,7 +14,7 @@ export const healthCheck: HealthChecker = ( input: ITraceable<HealthCheckInput, ServerTrace>, ): Promise<IEither<Error, HealthCheckOutput>> => input - .bimap(TraceUtil.withFunctionTrace(healthCheck)) + .flatMap(TraceUtil.withFunctionTrace(healthCheck)) .move(getRequiredEnv('LAMINAR_HOST')) // ensure LAMINAR_HOST is propagated to getStdout for other procedures .map((tEitherEnv) => diff --git a/server/hono_proxy.ts b/server/hono_proxy.ts new file mode 100644 index 0000000..f729819 --- /dev/null +++ b/server/hono_proxy.ts @@ -0,0 +1,71 @@ +import { + BaseRequest, + Either, + IEither, + LogMetricTraceable, + Metric, + PenguenoRequest, + Server, + Signals, + TraceUtil, +} from '@emprespresso/pengueno'; + +import { serve, ServerType } from '@hono/node-server'; +import { Hono } from 'hono'; + +const AppLifetimeMetric = Metric.fromName('HonoAppLifetime').asResult(); +const AppRequestMetric = Metric.fromName('HonoAppRequest'); + +export class HonoProxy { + private readonly app = LogMetricTraceable.of(new Hono()) + .flatMap(TraceUtil.withTrace(`AppId = ${crypto.randomUUID()}`)) + .flatMap(TraceUtil.withMetricTrace(AppLifetimeMetric)); + + constructor(private readonly server: Server) {} + + public async serve(port: number, hostname: string): Promise<IEither<Error, void>> { + return this.app + .map((tApp) => + Either.fromFailable<Error, ServerType>(() => { + const app = tApp.get(); + app.all('*', async (c) => + tApp + .flatMap(TraceUtil.withMetricTrace(AppRequestMetric)) + .move(<BaseRequest>c.req) + .flatMap((tRequest) => PenguenoRequest.from(tRequest)) + .map((req) => this.server.serve(req)) + .map( + TraceUtil.promiseify((tResponse) => { + tResponse.trace.trace(AppRequestMetric.count.withValue(1.0)); + return new Response(tResponse.get().body(), tResponse.get()); + }), + ) + .get(), + ); + return serve({ + fetch: (_r) => app.fetch(_r), + port, + hostname, + }); + }), + ) + .peek(TraceUtil.traceResultingEither()) + .peek((tServe) => + tServe + .get() + .mapRight(() => + tServe.trace.trace( + `haii im still listening at http://${hostname}:${port} ~uwu dont think i forgot`, + ), + ), + ) + .map((tEitherServer) => + tEitherServer + .get() + .mapRight((server) => tEitherServer.move(server)) + .flatMapAsync((tServer) => Signals.awaitClose(tServer)), + ) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(AppLifetimeMetric))) + .get(); + } +} diff --git a/server/index.ts b/server/index.ts index c33b43e..d018a4e 100644 --- a/server/index.ts +++ b/server/index.ts @@ -1,31 +1,13 @@ #!/usr/bin/env node -export * from './job'; -export * from './ci'; -export * from './health'; +export * from './job/index.js'; +export * from './ci.js'; +export * from './health.js'; +export * from './hono_proxy.js'; -import { CiHookServer } from '.'; -import { Either, type IEither } from '@emprespresso/pengueno'; -import { serve } from '@hono/node-server'; -import { Hono } from 'hono'; +import { CiHookServer, HonoProxy } from '@emprespresso/ci_server'; const server = new CiHookServer(); +const hono = new HonoProxy(server); -const neverEndingPromise = new Promise<IEither<Error, 0>>(() => {}); -export const runServer = (port: number, host: string): Promise<IEither<Error, 0>> => - Either.fromFailable<Error, void>(() => { - const app = new Hono(); - - app.all('*', async (c) => { - const response = await server.serve(c.req.raw); - return response; - }); - - serve({ - fetch: app.fetch, - port, - hostname: host, - }); - - console.log(`server running on http://${host}:${port} :D`); - }).flatMapAsync(() => neverEndingPromise); +export const runServer = (port: number, hostname: string) => hono.serve(port, hostname); diff --git a/server/job/index.ts b/server/job/index.ts index ecf0984..92c4682 100644 --- a/server/job/index.ts +++ b/server/job/index.ts @@ -1,2 +1,2 @@ -export * from './queue'; -export * from './run_activity'; +export * from './queue.js'; +export * from './run_activity.js'; diff --git a/server/job/queue.ts b/server/job/queue.ts index 2392222..4b21186 100644 --- a/server/job/queue.ts +++ b/server/job/queue.ts @@ -2,13 +2,13 @@ import { getStdout, type Mapper, memoize, - Either, type IEither, type ITraceable, LogLevel, Metric, type ServerTrace, TraceUtil, + PenguenoError, } from '@emprespresso/pengueno'; import { type Job } from '@emprespresso/ci_model'; @@ -23,7 +23,7 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace> private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`; private static JobTypeMetrics = memoize((jobType: string) => - Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)), + Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)).asResult(), ); public queue(j: ITraceable<Job, ServerTrace>) { @@ -32,8 +32,8 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace> const metric = LaminarJobQueuer.JobTypeMetrics(jobType); return j - .bimap(TraceUtil.withTrace(trace)) - .bimap(TraceUtil.withMetricTrace(metric)) + .flatMap(TraceUtil.withTrace(trace)) + .flatMap(TraceUtil.withMetricTrace(metric)) .map((j) => { const { type: jobType, arguments: args } = j.get(); const laminarCommand = [ @@ -47,26 +47,24 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace> .peek((c) => c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`), ) - .map(getStdout) - .peek( - TraceUtil.promiseify((q) => - q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))), - ), - ) + .map((c) => getStdout(c)) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric))) .map( TraceUtil.promiseify((q) => - q.get().fold(({ isLeft, value }) => { - if (isLeft) { - q.trace.addTrace(LogLevel.ERROR).trace(value.toString()); - return Either.left<Error, string>(value); - } - q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`); - const [jobName, jobId] = value.split(':'); - const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; + q.get().mapBoth( + (err) => { + q.trace.traceScope(LogLevel.ERROR).trace(err); + return err; + }, + (ok) => { + q.trace.traceScope(LogLevel.DEBUG).trace(`stdout ${ok}`); + const [jobName, jobId] = ok.split(':'); + const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`; - q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`); - return Either.right<Error, string>(jobUrl); - }), + q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`); + return jobUrl; + }, + ), ), ) .get(); diff --git a/server/job/run_activity.ts b/server/job/run_activity.ts index 9f25cf8..22bc4c7 100644 --- a/server/job/run_activity.ts +++ b/server/job/run_activity.ts @@ -7,7 +7,9 @@ import { jsonModel, JsonResponse, LogLevel, + LogMetricTraceSupplier, Metric, + MetricsTraceSupplier, PenguenoError, type PenguenoRequest, type ServerTrace, @@ -15,39 +17,35 @@ import { validateExecutionEntries, } from '@emprespresso/pengueno'; import { isJob, type Job } from '@emprespresso/ci_model'; -import { IJobQueuer } from './queue'; +import { IJobQueuer } from './queue.js'; -const wellFormedJobMetric = Metric.fromName('Job.WellFormed'); +const wellFormedJobMetric = Metric.fromName('Job.WellFormed').asResult(); const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, Job> => j - .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric)) + .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric)) .map((tJson): IEither<PenguenoError, Job> => { const tJob = tJson.get(); if (!isJob(tJob) || !validateExecutionEntries(tJob)) { const err = 'seems like a pwetty mawfomed job (-.-)'; - tJson.trace.addTrace(LogLevel.WARN).trace(err); + tJson.trace.traceScope(LogLevel.WARN).trace(err); return Either.left(new PenguenoError(err, 400)); } return Either.right(tJob); }) - .peek((tJob) => - tJob.trace.trace( - tJob.get().fold(({ isLeft }) => (isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success)), - ), - ) + .peek(TraceUtil.traceResultingEither(wellFormedJobMetric)) .get(); export interface IJobHookActivity { processHook: IActivity; } -const jobHookRequestMetric = Metric.fromName('JobHook.process'); +const jobHookRequestMetric = Metric.fromName('JobHook.process').asResult(); export class JobHookActivityImpl implements IJobHookActivity { constructor(private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>) {} private trace(r: ITraceable<PenguenoRequest, ServerTrace>) { - return r.bimap(TraceUtil.withClassTrace(this)).bimap(TraceUtil.withMetricTrace(jobHookRequestMetric)); + return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(jobHookRequestMetric)); } public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) { @@ -63,29 +61,34 @@ export class JobHookActivityImpl implements IJobHookActivity { return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500)); }); }) + .flatMapAsync( + TraceUtil.promiseify((tEitherQueued) => { + const errorSource = tEitherQueued + .get() + .left() + .map(({ source }) => source) + .orSome(() => ErrorSource.SYSTEM) + .get(); + const shouldWarn = errorSource === ErrorSource.USER; + return TraceUtil.traceResultingEither<PenguenoError, string, LogMetricTraceSupplier>( + jobHookRequestMetric, + shouldWarn, + )(tEitherQueued); + }), + ) .peek( TraceUtil.promiseify((tJob) => - tJob.get().fold(({ isRight, value }) => { - if (isRight) { - tJob.trace.trace(jobHookRequestMetric.success); - tJob.trace.trace(`all queued up and weady to go :D !! ${value}`); - return; - } - - tJob.trace.trace( - value.source === ErrorSource.SYSTEM - ? jobHookRequestMetric.failure - : jobHookRequestMetric.warn, - ); - tJob.trace.addTrace(value.source).trace(`${value}`); - }), + tJob.get().mapRight((job) => tJob.trace.trace(`all queued up and weady to go :D !! ${job}`)), ), ) .map( TraceUtil.promiseify( (tEitherQueuedJob) => new JsonResponse(r, tEitherQueuedJob.get(), { - status: tEitherQueuedJob.get().fold(({ isRight, value }) => (isRight ? 200 : value.status)), + status: tEitherQueuedJob.get().fold( + ({ status }) => status, + () => 200, + ), }), ), ) |