diff options
author | Elizabeth Hunt <me@liz.coffee> | 2025-08-17 23:50:24 -0700 |
---|---|---|
committer | Elizabeth Hunt <me@liz.coffee> | 2025-08-17 23:53:38 -0700 |
commit | 1a4fc9535a89b58e8b67c8996ade0b833116af3a (patch) | |
tree | d16f3129d7bb69f204bba8422e909354195a0042 /lib | |
parent | 157dc327e8fe63541b517cfbeeaf202a3e8553a5 (diff) | |
download | uptime-1a4fc9535a89b58e8b67c8996ade0b833116af3a.tar.gz uptime-1a4fc9535a89b58e8b67c8996ade0b833116af3a.zip |
Move to pengueno.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/email.ts | 200 | ||||
-rw-r--r-- | lib/email_activity.ts | 130 | ||||
-rw-r--r-- | lib/healthcheck.ts | 18 | ||||
-rw-r--r-- | lib/index.ts | 6 | ||||
-rw-r--r-- | lib/model.ts | 87 | ||||
-rw-r--r-- | lib/run.ts | 7 | ||||
-rw-r--r-- | lib/server.ts | 35 |
7 files changed, 483 insertions, 0 deletions
diff --git a/lib/email.ts b/lib/email.ts new file mode 100644 index 0000000..38beb0c --- /dev/null +++ b/lib/email.ts @@ -0,0 +1,200 @@ +import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from 'imapflow'; +import { EmailSendInstruction, type Email, EmailRecvInstruction } from '@emprespresso/uptime'; +import { Either, ITraceable, LogLevel, Metric, Optional, ServerTrace, TraceUtil } from '@emprespresso/pengueno'; +import { createTransport } from 'nodemailer'; + +interface IRecv { + connect: () => Promise<void>; + logout: () => Promise<void>; + + getMailboxLock: (mailbox: string) => Promise<MailboxLockObject>; + mailboxClose: () => Promise<boolean>; + + fetchAll: (range: string, options: FetchQueryObject) => Promise<FetchMessageObject[]>; + messageDelete: (uids: number[], opts: Record<string, any>) => Promise<boolean>; +} + +interface ISend { + sendMail: (email: Email) => Promise<Email>; +} + +export class Outbox { + public constructor(private readonly sender: ISend) {} + + public async send(email: ITraceable<Email>) { + return email + .flatMap(TraceUtil.withMetricTrace(Outbox.Metrics.SEND)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Sending email')) + .map((email) => Either.fromFailableAsync<Error, Email>(() => this.sender.sendMail(email.get()))) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Outbox.Metrics.SEND))) + .get(); + } + + public static async from(instruction: ITraceable<EmailSendInstruction>) { + return instruction + .flatMap(TraceUtil.withMetricTrace(Outbox.Metrics.INIT)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Initializing outbox')) + .map((instruction) => + createTransport({ + host: instruction.get().send_host, + port: instruction.get().send_port, + auth: { + user: instruction.get().username, + pass: instruction.get().password, + }, + tls: { + rejectUnauthorized: false, + }, + }), + ) + .map( + (transport) => + new Outbox({ + sendMail: (email: Email) => + new Promise<Email>((resolve, reject) => + transport.get().sendMail(email, (error) => { + if (error) { + reject(error); + } else { + resolve(email); + } + }), + ), + }), + ) + .peek(TraceUtil.withMetricTrace(Outbox.Metrics.INIT)) + .map((x) => Either.right<Error, Outbox>(x.get())) + .get(); + } + + private static Metrics = { + INIT: Metric.fromName('Outbox.Initialize').asResult(), + SEND: Metric.fromName('Outbox.Send').asResult(), + }; +} + +export class Inbox { + public constructor( + private readonly recv: IRecv, + private lock?: ITraceable<MailboxLockObject, ServerTrace>, + ) {} + + public async deleteMessages(_uids: Array<number> | number) { + const uids = Array.isArray(_uids) ? _uids : [_uids]; + return this.lock!.flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.DELETE)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace(`Deleting messages ${uids}`)) + .map(() => Either.fromFailableAsync<Error, boolean>(() => this.recv.messageDelete(uids, { uid: true }))) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.DELETE))) + .get(); + } + + public async listMessages() { + return this.lock!.flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.LIST)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Listing messages')) + .map(() => + Either.fromFailableAsync<Error, Array<FetchMessageObject>>(() => + this.recv.fetchAll('*', { + uid: true, + envelope: true, + headers: true, + bodyParts: ['text'], + }), + ), + ) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.LIST))) + .peek( + TraceUtil.promiseify((res) => + res.trace.trace( + res.get().fold( + (err) => err.toString(), + (ok) => `Found ${ok.length} messages`, + ), + ), + ), + ) + .get(); + } + + public async close() { + await this.recv.mailboxClose().then(() => this.recv.logout()); + this.lock!.peek((l) => l.get().release()) + .peek(t => t.trace.trace(Inbox.Metrics.LOCK)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Released lock')) + .get(); + delete this.lock; + } + + public async find(email: ITraceable<Email>) { + return email + .flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.FIND)) + .map((e) => + this.listMessages().then((eitherMessages) => + eitherMessages.flatMap((messages) => + Either.fromFailable(() => + Optional.from(messages.find((message) => this.equals(e.get(), message))).get(), + ), + ), + ), + ) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.FIND))) + .get(); + } + + public static from(instruction: ITraceable<EmailRecvInstruction, ServerTrace>) { + const client = new ImapFlow({ + logger: false, + host: instruction.get().read_host, + port: instruction.get().read_port, + secure: true, + auth: { + user: instruction.get().username, + pass: instruction.get().password, + }, + }); + return instruction + .move(client) + .flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.INIT)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Initializing mailbox')) + .map((tClient) => + Either.fromFailableAsync<Error, MailboxLockObject>(() => + tClient + .get() + .connect() + .then(() => tClient.get().getMailboxLock('INBOX')), + ).then((eitherLock) => + eitherLock + .mapRight((lock) => tClient.move(lock).flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.LOCK))) + .mapRight((traceableLock) => new Inbox(tClient.get(), traceableLock)), + ), + ) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.INIT))) + .peek( + TraceUtil.promiseify((res) => + res.trace.trace( + res.get().fold( + (err) => `Could not initialize mailbox: ${err.stack}`, + (ok) => `Initialized mailbox`, + ), + ), + ), + ) + .get(); + } + + private equals(email: Email, message: FetchMessageObject) { + const subjectMatches = email.subject === message.envelope?.subject; + const bodyMatches = message.bodyParts?.get('text')?.toString().trim() === email.text.trim(); + const headers = message.headers?.toLocaleString(); + const fromMatches = headers?.includes(`Return-Path: <${email.from}>`); + const toMatches = headers?.includes(`Delivered-To: ${email.to}`); + return subjectMatches && bodyMatches && fromMatches && toMatches; + } + + private static Metrics = { + INIT: Metric.fromName('Inbox.Initialize').asResult(), + FIND: Metric.fromName('Inbox.FindMessage').asResult(), + LOCK: Metric.fromName('Inbox.Lock').asResult(), + LIST: Metric.fromName('Inbox.List').asResult(), + DELETE: Metric.fromName('Inbox.Delete').asResult(), + }; +} diff --git a/lib/email_activity.ts b/lib/email_activity.ts new file mode 100644 index 0000000..9abdb1f --- /dev/null +++ b/lib/email_activity.ts @@ -0,0 +1,130 @@ +import { + Either, + type IActivity, + type IEither, + type ITraceable, + jsonModel, + JsonResponse, + LogLevel, + Metric, + PenguenoError, + type PenguenoRequest, + type ServerTrace, + TraceUtil, +} from '@emprespresso/pengueno'; +import { + Email, + EmailJob, + isEmailJob, + Inbox, + Outbox, + EmailSendInstruction, + EmailRecvInstruction, +} from '@emprespresso/uptime'; + +const wellFormedJobMetric = Metric.fromName('Email.WellFormed').asResult(); + +const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, EmailJob> => + j + .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric)) + .map((tJson): IEither<PenguenoError, EmailJob> => { + const tJob = tJson.get(); + if (!isEmailJob(tJob)) { + const err = 'seems like a pwetty mawfomed job (-.-)'; + tJson.trace.traceScope(LogLevel.WARN).trace(err); + return Either.left(new PenguenoError(err, 400)); + } + return Either.right(tJob); + }) + .peek(TraceUtil.traceResultingEither(wellFormedJobMetric)) + .get(); + +export interface IEmailActivity { + runMail: IActivity; +} + +const emailJobRequestMetric = Metric.fromName('JobHook.process').asResult(); +export class EmailActivityImpl implements IEmailActivity { + constructor( + private readonly outboxFactory: ( + instruction: ITraceable<EmailSendInstruction, ServerTrace>, + ) => Promise<IEither<Error, Outbox>>, + private readonly inboxFactory: ( + instruction: ITraceable<EmailRecvInstruction, ServerTrace>, + ) => Promise<IEither<Error, Inbox>>, + ) {} + + private trace(r: ITraceable<PenguenoRequest, ServerTrace>) { + return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(emailJobRequestMetric)); + } + + private async runJob(_tJob: ITraceable<EmailJob, ServerTrace>) { + const { from, to, readRetry } = _tJob.get(); + const emailToTest: Email = { + from: from.email, + to: to.email, + subject: [new Date().toISOString(), crypto.randomUUID()].join(' | '), + text: crypto.randomUUID(), + }; + const tJob = _tJob.traceScope(() => `Subject = ${emailToTest.subject}`); + const tEmail = tJob.move(emailToTest); + + const inNOut = await ( + await this.outboxFactory(tJob.move(from)) + ).joinRightAsync( + () => this.inboxFactory(tJob.move(to)), + (inbox, outbox) => ({ inbox, outbox }), + ); + const sent = await inNOut.joinRightAsync( + () => inNOut.flatMapAsync(({ outbox }) => outbox.send(tEmail)), + (prev, n) => prev, + ); + const recvd = await Either.retrying( + async () => + sent.joinRightAsync( + () => + inNOut.flatMapAsync(({ inbox }) => + inbox.find(tEmail.peek((t) => t.trace.trace('Looking for message'))), + ), + (prev, n) => prev, + ), + readRetry.attempts, + (_attempt) => new Promise((res) => setTimeout(res, readRetry.interval * 1000)), + ); + const deleted = await recvd + .joinRight(inNOut, ({ inbox }, recvd) => inbox.deleteMessages(recvd.uid)) + .flatMapAsync((x) => x); + return inNOut.flatMapAsync(async ({ inbox }) => { + await inbox.close(); + return deleted; + }); + } + + public runMail(r: ITraceable<PenguenoRequest, ServerTrace>) { + return this.trace(r) + .map(jsonModel(jobJsonTransformer)) + .map(async (tJob) => { + const eitherJob = await tJob.get(); + return eitherJob.flatMapAsync((job) => + this.runJob(tJob.move(job)).then((res) => + res.mapLeft((err) => { + tJob.trace.traceScope(LogLevel.ERROR).trace(err); + return new PenguenoError('Failed running job ' + err.toString(), 500); + }), + ), + ); + }) + .map( + TraceUtil.promiseify( + (tEitherEmail) => + new JsonResponse(r, tEitherEmail.get(), { + status: tEitherEmail.get().fold( + ({ status }) => status, + () => 200, + ), + }), + ), + ) + .get(); + } +} diff --git a/lib/healthcheck.ts b/lib/healthcheck.ts new file mode 100644 index 0000000..3ecc0eb --- /dev/null +++ b/lib/healthcheck.ts @@ -0,0 +1,18 @@ +import { + TraceUtil, + IEither, + ITraceable, + HealthChecker, + HealthCheckInput, + HealthCheckOutput, + ServerTrace, + Either, +} from '@emprespresso/pengueno'; + +export const healthCheck: HealthChecker = ( + input: ITraceable<HealthCheckInput, ServerTrace>, +): Promise<IEither<Error, HealthCheckOutput>> => + input + .flatMap(TraceUtil.withFunctionTrace(healthCheck)) + .move(Promise.resolve(Either.right<Error, HealthCheckOutput>(HealthCheckOutput.YAASSSLAYQUEEN))) + .get(); diff --git a/lib/index.ts b/lib/index.ts new file mode 100644 index 0000000..9194b03 --- /dev/null +++ b/lib/index.ts @@ -0,0 +1,6 @@ +export * from './model'; +export * from './email'; +export * from './email_activity'; +export * from './healthcheck'; +export * from './server'; +export * from './run'; diff --git a/lib/model.ts b/lib/model.ts new file mode 100644 index 0000000..7896d70 --- /dev/null +++ b/lib/model.ts @@ -0,0 +1,87 @@ +import { isObject } from '@emprespresso/pengueno'; + +export interface Email { + from: string; + to: string; + subject: string; + text: string; +} + +export interface EmailCredentials { + email: string; + username: string; + password: string; +} + +export const isCreds = (u: unknown): u is EmailCredentials => + !!( + isObject(u) && + ['email', 'username', 'password'].every((key) => key in u && typeof u[key as keyof typeof u] === 'string') + ); + +export interface EmailSendInstruction extends EmailCredentials { + send_host: string; + send_port: number; +} + +export const isSend = (u: unknown): u is EmailSendInstruction => + !!( + isCreds(u) && + 'send_host' in u && + typeof u.send_host === 'string' && + 'send_port' in u && + typeof u.send_port === 'number' + ); + +export interface EmailRecvInstruction extends EmailCredentials { + read_host: string; + read_port: number; +} + +export const isRecv = (u: unknown): u is EmailSendInstruction => + !!( + isCreds(u) && + 'read_host' in u && + typeof u.read_host === 'string' && + 'read_port' in u && + typeof u.read_port === 'number' + ); + +export interface EmailJob { + from: EmailSendInstruction; + to: EmailRecvInstruction; + readRetry: Retry; +} + +export const isEmailJob = (u: unknown): u is EmailJob => + !!( + isObject(u) && + 'from' in u && + isSend(u.from) && + 'to' in u && + isRecv(u.to) && + 'readRetry' in u && + isRetry(u.readRetry) + ); + +export interface Retry { + attempts: number; + interval: number; +} + +export const isRetry = (u: unknown): u is Retry => + !!( + isObject(u) && ['attempts', 'interval'].every((key) => key in u && typeof u[key as keyof typeof u] === 'number') + ); + +export const redact = <T extends EmailCredentials>(instruction: T): T => ({ + ...instruction, + password: 'REDACTED', + username: 'REDACTED', +}); + +export const redactJob = (job: EmailJob): EmailJob => ({ + ...job, + from: redact(job.from), + to: redact(job.to), +}); diff --git a/lib/run.ts b/lib/run.ts new file mode 100644 index 0000000..b16fd53 --- /dev/null +++ b/lib/run.ts @@ -0,0 +1,7 @@ +import { HonoProxy } from '@emprespresso/pengueno'; +import { UptimeServer } from '@emprespresso/uptime'; + +const server = new UptimeServer(); +const hono = new HonoProxy(server); + +export const runServer = (port: number, hostname: string) => hono.serve(port, hostname); diff --git a/lib/server.ts b/lib/server.ts new file mode 100644 index 0000000..9885bf3 --- /dev/null +++ b/lib/server.ts @@ -0,0 +1,35 @@ +import { + FourOhFourActivityImpl, + HealthCheckActivityImpl, + type HealthChecker, + type IFourOhFourActivity, + type IHealthCheckActivity, + type ITraceable, + PenguenoRequest, + Server, + type ServerTrace, +} from '@emprespresso/pengueno'; +import { healthCheck as _healthCheck, EmailActivityImpl, IEmailActivity, Inbox, Outbox } from './index'; + +export class UptimeServer implements Server { + constructor( + healthCheck: HealthChecker = _healthCheck, + private readonly healthCheckActivity: IHealthCheckActivity = new HealthCheckActivityImpl(healthCheck), + private readonly emailActivity: IEmailActivity = new EmailActivityImpl( + (a) => Outbox.from(a), + (b) => Inbox.from(b), + ), + private readonly fourOhFourActivity: IFourOhFourActivity = new FourOhFourActivityImpl(), + ) {} + + public serve(req: ITraceable<PenguenoRequest, ServerTrace>) { + const url = new URL(req.get().req.url); + if (url.pathname === '/health') { + return this.healthCheckActivity.checkHealth(req); + } + if (url.pathname === '/email') { + return this.emailActivity.runMail(req); + } + return this.fourOhFourActivity.fourOhFour(req); + } +} |