summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authorElizabeth Hunt <me@liz.coffee>2025-06-29 17:31:30 -0700
committerElizabeth Hunt <me@liz.coffee>2025-06-29 17:31:30 -0700
commit58be1809c46cbe517a18d86d0af52179dcc5cbf6 (patch)
tree9ccc678b3fd48c1a52fe501600dd2c2051740a55 /server
parentd4791f3d357634daf506fb8f91cc5332a794c421 (diff)
downloadci-58be1809c46cbe517a18d86d0af52179dcc5cbf6.tar.gz
ci-58be1809c46cbe517a18d86d0af52179dcc5cbf6.zip
Move to nodejs and also lots of significant refactoring that should've been broken up but idgaf
Diffstat (limited to 'server')
-rw-r--r--server/ci.ts30
-rw-r--r--server/health.ts2
-rw-r--r--server/hono_proxy.ts71
-rw-r--r--server/index.ts32
-rw-r--r--server/job/index.ts4
-rw-r--r--server/job/queue.ts40
-rw-r--r--server/job/run_activity.ts55
7 files changed, 144 insertions, 90 deletions
diff --git a/server/ci.ts b/server/ci.ts
index f57c426..c8aa6a1 100644
--- a/server/ci.ts
+++ b/server/ci.ts
@@ -1,34 +1,41 @@
import {
FourOhFourActivityImpl,
- getRequiredEnv,
+ getEnv,
HealthCheckActivityImpl,
type HealthChecker,
type IFourOhFourActivity,
type IHealthCheckActivity,
type ITraceable,
PenguenoRequest,
+ Server,
type ServerTrace,
- TraceUtil,
} from '@emprespresso/pengueno';
import type { Job } from '@emprespresso/ci_model';
-import { type IJobHookActivity, type IJobQueuer, JobHookActivityImpl, LaminarJobQueuer } from './job';
-import { healthCheck as _healthCheck } from '.';
+import {
+ healthCheck as _healthCheck,
+ type IJobHookActivity,
+ type IJobQueuer,
+ JobHookActivityImpl,
+ LaminarJobQueuer,
+} from '@emprespresso/ci_server';
export const DEFAULT_CI_SERVER = 'https://ci.liz.coffee';
-export class CiHookServer {
+export class CiHookServer implements Server {
constructor(
healthCheck: HealthChecker = _healthCheck,
jobQueuer: IJobQueuer<ITraceable<Job, ServerTrace>> = new LaminarJobQueuer(
- getRequiredEnv('LAMINAR_URL').fold(({ isLeft, value }) => (isLeft ? DEFAULT_CI_SERVER : value)),
+ getEnv('LAMINAR_URL')
+ .orSome(() => DEFAULT_CI_SERVER)
+ .get(),
),
private readonly healthCheckActivity: IHealthCheckActivity = new HealthCheckActivityImpl(healthCheck),
private readonly jobHookActivity: IJobHookActivity = new JobHookActivityImpl(jobQueuer),
private readonly fourOhFourActivity: IFourOhFourActivity = new FourOhFourActivityImpl(),
) {}
- private route(req: ITraceable<PenguenoRequest, ServerTrace>) {
- const url = new URL(req.get().url);
+ public serve(req: ITraceable<PenguenoRequest, ServerTrace>) {
+ const url = new URL(req.get().req.url);
if (url.pathname === '/health') {
return this.healthCheckActivity.checkHealth(req);
}
@@ -37,11 +44,4 @@ export class CiHookServer {
}
return this.fourOhFourActivity.fourOhFour(req);
}
-
- public serve(req: Request): Promise<Response> {
- return PenguenoRequest.from(req)
- .bimap(TraceUtil.withClassTrace(this))
- .map((req) => this.route(req))
- .get();
- }
}
diff --git a/server/health.ts b/server/health.ts
index 8435865..6a1f77f 100644
--- a/server/health.ts
+++ b/server/health.ts
@@ -14,7 +14,7 @@ export const healthCheck: HealthChecker = (
input: ITraceable<HealthCheckInput, ServerTrace>,
): Promise<IEither<Error, HealthCheckOutput>> =>
input
- .bimap(TraceUtil.withFunctionTrace(healthCheck))
+ .flatMap(TraceUtil.withFunctionTrace(healthCheck))
.move(getRequiredEnv('LAMINAR_HOST'))
// ensure LAMINAR_HOST is propagated to getStdout for other procedures
.map((tEitherEnv) =>
diff --git a/server/hono_proxy.ts b/server/hono_proxy.ts
new file mode 100644
index 0000000..f729819
--- /dev/null
+++ b/server/hono_proxy.ts
@@ -0,0 +1,71 @@
+import {
+ BaseRequest,
+ Either,
+ IEither,
+ LogMetricTraceable,
+ Metric,
+ PenguenoRequest,
+ Server,
+ Signals,
+ TraceUtil,
+} from '@emprespresso/pengueno';
+
+import { serve, ServerType } from '@hono/node-server';
+import { Hono } from 'hono';
+
+const AppLifetimeMetric = Metric.fromName('HonoAppLifetime').asResult();
+const AppRequestMetric = Metric.fromName('HonoAppRequest');
+
+export class HonoProxy {
+ private readonly app = LogMetricTraceable.of(new Hono())
+ .flatMap(TraceUtil.withTrace(`AppId = ${crypto.randomUUID()}`))
+ .flatMap(TraceUtil.withMetricTrace(AppLifetimeMetric));
+
+ constructor(private readonly server: Server) {}
+
+ public async serve(port: number, hostname: string): Promise<IEither<Error, void>> {
+ return this.app
+ .map((tApp) =>
+ Either.fromFailable<Error, ServerType>(() => {
+ const app = tApp.get();
+ app.all('*', async (c) =>
+ tApp
+ .flatMap(TraceUtil.withMetricTrace(AppRequestMetric))
+ .move(<BaseRequest>c.req)
+ .flatMap((tRequest) => PenguenoRequest.from(tRequest))
+ .map((req) => this.server.serve(req))
+ .map(
+ TraceUtil.promiseify((tResponse) => {
+ tResponse.trace.trace(AppRequestMetric.count.withValue(1.0));
+ return new Response(tResponse.get().body(), tResponse.get());
+ }),
+ )
+ .get(),
+ );
+ return serve({
+ fetch: (_r) => app.fetch(_r),
+ port,
+ hostname,
+ });
+ }),
+ )
+ .peek(TraceUtil.traceResultingEither())
+ .peek((tServe) =>
+ tServe
+ .get()
+ .mapRight(() =>
+ tServe.trace.trace(
+ `haii im still listening at http://${hostname}:${port} ~uwu dont think i forgot`,
+ ),
+ ),
+ )
+ .map((tEitherServer) =>
+ tEitherServer
+ .get()
+ .mapRight((server) => tEitherServer.move(server))
+ .flatMapAsync((tServer) => Signals.awaitClose(tServer)),
+ )
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(AppLifetimeMetric)))
+ .get();
+ }
+}
diff --git a/server/index.ts b/server/index.ts
index c33b43e..d018a4e 100644
--- a/server/index.ts
+++ b/server/index.ts
@@ -1,31 +1,13 @@
#!/usr/bin/env node
-export * from './job';
-export * from './ci';
-export * from './health';
+export * from './job/index.js';
+export * from './ci.js';
+export * from './health.js';
+export * from './hono_proxy.js';
-import { CiHookServer } from '.';
-import { Either, type IEither } from '@emprespresso/pengueno';
-import { serve } from '@hono/node-server';
-import { Hono } from 'hono';
+import { CiHookServer, HonoProxy } from '@emprespresso/ci_server';
const server = new CiHookServer();
+const hono = new HonoProxy(server);
-const neverEndingPromise = new Promise<IEither<Error, 0>>(() => {});
-export const runServer = (port: number, host: string): Promise<IEither<Error, 0>> =>
- Either.fromFailable<Error, void>(() => {
- const app = new Hono();
-
- app.all('*', async (c) => {
- const response = await server.serve(c.req.raw);
- return response;
- });
-
- serve({
- fetch: app.fetch,
- port,
- hostname: host,
- });
-
- console.log(`server running on http://${host}:${port} :D`);
- }).flatMapAsync(() => neverEndingPromise);
+export const runServer = (port: number, hostname: string) => hono.serve(port, hostname);
diff --git a/server/job/index.ts b/server/job/index.ts
index ecf0984..92c4682 100644
--- a/server/job/index.ts
+++ b/server/job/index.ts
@@ -1,2 +1,2 @@
-export * from './queue';
-export * from './run_activity';
+export * from './queue.js';
+export * from './run_activity.js';
diff --git a/server/job/queue.ts b/server/job/queue.ts
index 2392222..4b21186 100644
--- a/server/job/queue.ts
+++ b/server/job/queue.ts
@@ -2,13 +2,13 @@ import {
getStdout,
type Mapper,
memoize,
- Either,
type IEither,
type ITraceable,
LogLevel,
Metric,
type ServerTrace,
TraceUtil,
+ PenguenoError,
} from '@emprespresso/pengueno';
import { type Job } from '@emprespresso/ci_model';
@@ -23,7 +23,7 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
private static GetJobTypeTrace = (jobType: string) => `LaminarJobQueue.Queue.${jobType}`;
private static JobTypeMetrics = memoize((jobType: string) =>
- Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)),
+ Metric.fromName(LaminarJobQueuer.GetJobTypeTrace(jobType)).asResult(),
);
public queue(j: ITraceable<Job, ServerTrace>) {
@@ -32,8 +32,8 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
const metric = LaminarJobQueuer.JobTypeMetrics(jobType);
return j
- .bimap(TraceUtil.withTrace(trace))
- .bimap(TraceUtil.withMetricTrace(metric))
+ .flatMap(TraceUtil.withTrace(trace))
+ .flatMap(TraceUtil.withMetricTrace(metric))
.map((j) => {
const { type: jobType, arguments: args } = j.get();
const laminarCommand = [
@@ -47,26 +47,24 @@ export class LaminarJobQueuer implements IJobQueuer<ITraceable<Job, ServerTrace>
.peek((c) =>
c.trace.trace(`im so excited to see how this queue job will end!! (>ᴗ<): ${c.get().toString()}`),
)
- .map(getStdout)
- .peek(
- TraceUtil.promiseify((q) =>
- q.trace.trace(q.get().fold(({ isLeft }) => (isLeft ? metric.failure : metric.success))),
- ),
- )
+ .map((c) => getStdout(c))
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(metric)))
.map(
TraceUtil.promiseify((q) =>
- q.get().fold(({ isLeft, value }) => {
- if (isLeft) {
- q.trace.addTrace(LogLevel.ERROR).trace(value.toString());
- return Either.left<Error, string>(value);
- }
- q.trace.addTrace(LogLevel.DEBUG).trace(`stdout ${value}`);
- const [jobName, jobId] = value.split(':');
- const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
+ q.get().mapBoth(
+ (err) => {
+ q.trace.traceScope(LogLevel.ERROR).trace(err);
+ return err;
+ },
+ (ok) => {
+ q.trace.traceScope(LogLevel.DEBUG).trace(`stdout ${ok}`);
+ const [jobName, jobId] = ok.split(':');
+ const jobUrl = `${this.queuePositionPrefix}/jobs/${jobName}/${jobId}`;
- q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
- return Either.right<Error, string>(jobUrl);
- }),
+ q.trace.trace(`all queued up and weady to go~ (˘ω˘) => ${jobUrl}`);
+ return jobUrl;
+ },
+ ),
),
)
.get();
diff --git a/server/job/run_activity.ts b/server/job/run_activity.ts
index 9f25cf8..22bc4c7 100644
--- a/server/job/run_activity.ts
+++ b/server/job/run_activity.ts
@@ -7,7 +7,9 @@ import {
jsonModel,
JsonResponse,
LogLevel,
+ LogMetricTraceSupplier,
Metric,
+ MetricsTraceSupplier,
PenguenoError,
type PenguenoRequest,
type ServerTrace,
@@ -15,39 +17,35 @@ import {
validateExecutionEntries,
} from '@emprespresso/pengueno';
import { isJob, type Job } from '@emprespresso/ci_model';
-import { IJobQueuer } from './queue';
+import { IJobQueuer } from './queue.js';
-const wellFormedJobMetric = Metric.fromName('Job.WellFormed');
+const wellFormedJobMetric = Metric.fromName('Job.WellFormed').asResult();
const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, Job> =>
j
- .bimap(TraceUtil.withMetricTrace(wellFormedJobMetric))
+ .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric))
.map((tJson): IEither<PenguenoError, Job> => {
const tJob = tJson.get();
if (!isJob(tJob) || !validateExecutionEntries(tJob)) {
const err = 'seems like a pwetty mawfomed job (-.-)';
- tJson.trace.addTrace(LogLevel.WARN).trace(err);
+ tJson.trace.traceScope(LogLevel.WARN).trace(err);
return Either.left(new PenguenoError(err, 400));
}
return Either.right(tJob);
})
- .peek((tJob) =>
- tJob.trace.trace(
- tJob.get().fold(({ isLeft }) => (isLeft ? wellFormedJobMetric.failure : wellFormedJobMetric.success)),
- ),
- )
+ .peek(TraceUtil.traceResultingEither(wellFormedJobMetric))
.get();
export interface IJobHookActivity {
processHook: IActivity;
}
-const jobHookRequestMetric = Metric.fromName('JobHook.process');
+const jobHookRequestMetric = Metric.fromName('JobHook.process').asResult();
export class JobHookActivityImpl implements IJobHookActivity {
constructor(private readonly queuer: IJobQueuer<ITraceable<Job, ServerTrace>>) {}
private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
- return r.bimap(TraceUtil.withClassTrace(this)).bimap(TraceUtil.withMetricTrace(jobHookRequestMetric));
+ return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(jobHookRequestMetric));
}
public processHook(r: ITraceable<PenguenoRequest, ServerTrace>) {
@@ -63,29 +61,34 @@ export class JobHookActivityImpl implements IJobHookActivity {
return eitherQueued.mapLeft((e) => new PenguenoError(e.message, 500));
});
})
+ .flatMapAsync(
+ TraceUtil.promiseify((tEitherQueued) => {
+ const errorSource = tEitherQueued
+ .get()
+ .left()
+ .map(({ source }) => source)
+ .orSome(() => ErrorSource.SYSTEM)
+ .get();
+ const shouldWarn = errorSource === ErrorSource.USER;
+ return TraceUtil.traceResultingEither<PenguenoError, string, LogMetricTraceSupplier>(
+ jobHookRequestMetric,
+ shouldWarn,
+ )(tEitherQueued);
+ }),
+ )
.peek(
TraceUtil.promiseify((tJob) =>
- tJob.get().fold(({ isRight, value }) => {
- if (isRight) {
- tJob.trace.trace(jobHookRequestMetric.success);
- tJob.trace.trace(`all queued up and weady to go :D !! ${value}`);
- return;
- }
-
- tJob.trace.trace(
- value.source === ErrorSource.SYSTEM
- ? jobHookRequestMetric.failure
- : jobHookRequestMetric.warn,
- );
- tJob.trace.addTrace(value.source).trace(`${value}`);
- }),
+ tJob.get().mapRight((job) => tJob.trace.trace(`all queued up and weady to go :D !! ${job}`)),
),
)
.map(
TraceUtil.promiseify(
(tEitherQueuedJob) =>
new JsonResponse(r, tEitherQueuedJob.get(), {
- status: tEitherQueuedJob.get().fold(({ isRight, value }) => (isRight ? 200 : value.status)),
+ status: tEitherQueuedJob.get().fold(
+ ({ status }) => status,
+ () => 200,
+ ),
}),
),
)