summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/email.ts200
-rw-r--r--lib/email_activity.ts130
-rw-r--r--lib/healthcheck.ts18
-rw-r--r--lib/index.ts6
-rw-r--r--lib/model.ts87
-rw-r--r--lib/run.ts7
-rw-r--r--lib/server.ts35
7 files changed, 483 insertions, 0 deletions
diff --git a/lib/email.ts b/lib/email.ts
new file mode 100644
index 0000000..38beb0c
--- /dev/null
+++ b/lib/email.ts
@@ -0,0 +1,200 @@
+import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from 'imapflow';
+import { EmailSendInstruction, type Email, EmailRecvInstruction } from '@emprespresso/uptime';
+import { Either, ITraceable, LogLevel, Metric, Optional, ServerTrace, TraceUtil } from '@emprespresso/pengueno';
+import { createTransport } from 'nodemailer';
+
+interface IRecv {
+ connect: () => Promise<void>;
+ logout: () => Promise<void>;
+
+ getMailboxLock: (mailbox: string) => Promise<MailboxLockObject>;
+ mailboxClose: () => Promise<boolean>;
+
+ fetchAll: (range: string, options: FetchQueryObject) => Promise<FetchMessageObject[]>;
+ messageDelete: (uids: number[], opts: Record<string, any>) => Promise<boolean>;
+}
+
+interface ISend {
+ sendMail: (email: Email) => Promise<Email>;
+}
+
+export class Outbox {
+ public constructor(private readonly sender: ISend) {}
+
+ public async send(email: ITraceable<Email>) {
+ return email
+ .flatMap(TraceUtil.withMetricTrace(Outbox.Metrics.SEND))
+ .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Sending email'))
+ .map((email) => Either.fromFailableAsync<Error, Email>(() => this.sender.sendMail(email.get())))
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Outbox.Metrics.SEND)))
+ .get();
+ }
+
+ public static async from(instruction: ITraceable<EmailSendInstruction>) {
+ return instruction
+ .flatMap(TraceUtil.withMetricTrace(Outbox.Metrics.INIT))
+ .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Initializing outbox'))
+ .map((instruction) =>
+ createTransport({
+ host: instruction.get().send_host,
+ port: instruction.get().send_port,
+ auth: {
+ user: instruction.get().username,
+ pass: instruction.get().password,
+ },
+ tls: {
+ rejectUnauthorized: false,
+ },
+ }),
+ )
+ .map(
+ (transport) =>
+ new Outbox({
+ sendMail: (email: Email) =>
+ new Promise<Email>((resolve, reject) =>
+ transport.get().sendMail(email, (error) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(email);
+ }
+ }),
+ ),
+ }),
+ )
+ .peek(TraceUtil.withMetricTrace(Outbox.Metrics.INIT))
+ .map((x) => Either.right<Error, Outbox>(x.get()))
+ .get();
+ }
+
+ private static Metrics = {
+ INIT: Metric.fromName('Outbox.Initialize').asResult(),
+ SEND: Metric.fromName('Outbox.Send').asResult(),
+ };
+}
+
+export class Inbox {
+ public constructor(
+ private readonly recv: IRecv,
+ private lock?: ITraceable<MailboxLockObject, ServerTrace>,
+ ) {}
+
+ public async deleteMessages(_uids: Array<number> | number) {
+ const uids = Array.isArray(_uids) ? _uids : [_uids];
+ return this.lock!.flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.DELETE))
+ .peek((res) => res.trace.traceScope(LogLevel.INFO).trace(`Deleting messages ${uids}`))
+ .map(() => Either.fromFailableAsync<Error, boolean>(() => this.recv.messageDelete(uids, { uid: true })))
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.DELETE)))
+ .get();
+ }
+
+ public async listMessages() {
+ return this.lock!.flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.LIST))
+ .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Listing messages'))
+ .map(() =>
+ Either.fromFailableAsync<Error, Array<FetchMessageObject>>(() =>
+ this.recv.fetchAll('*', {
+ uid: true,
+ envelope: true,
+ headers: true,
+ bodyParts: ['text'],
+ }),
+ ),
+ )
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.LIST)))
+ .peek(
+ TraceUtil.promiseify((res) =>
+ res.trace.trace(
+ res.get().fold(
+ (err) => err.toString(),
+ (ok) => `Found ${ok.length} messages`,
+ ),
+ ),
+ ),
+ )
+ .get();
+ }
+
+ public async close() {
+ await this.recv.mailboxClose().then(() => this.recv.logout());
+ this.lock!.peek((l) => l.get().release())
+ .peek(t => t.trace.trace(Inbox.Metrics.LOCK))
+ .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Released lock'))
+ .get();
+ delete this.lock;
+ }
+
+ public async find(email: ITraceable<Email>) {
+ return email
+ .flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.FIND))
+ .map((e) =>
+ this.listMessages().then((eitherMessages) =>
+ eitherMessages.flatMap((messages) =>
+ Either.fromFailable(() =>
+ Optional.from(messages.find((message) => this.equals(e.get(), message))).get(),
+ ),
+ ),
+ ),
+ )
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.FIND)))
+ .get();
+ }
+
+ public static from(instruction: ITraceable<EmailRecvInstruction, ServerTrace>) {
+ const client = new ImapFlow({
+ logger: false,
+ host: instruction.get().read_host,
+ port: instruction.get().read_port,
+ secure: true,
+ auth: {
+ user: instruction.get().username,
+ pass: instruction.get().password,
+ },
+ });
+ return instruction
+ .move(client)
+ .flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.INIT))
+ .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Initializing mailbox'))
+ .map((tClient) =>
+ Either.fromFailableAsync<Error, MailboxLockObject>(() =>
+ tClient
+ .get()
+ .connect()
+ .then(() => tClient.get().getMailboxLock('INBOX')),
+ ).then((eitherLock) =>
+ eitherLock
+ .mapRight((lock) => tClient.move(lock).flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.LOCK)))
+ .mapRight((traceableLock) => new Inbox(tClient.get(), traceableLock)),
+ ),
+ )
+ .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.INIT)))
+ .peek(
+ TraceUtil.promiseify((res) =>
+ res.trace.trace(
+ res.get().fold(
+ (err) => `Could not initialize mailbox: ${err.stack}`,
+ (ok) => `Initialized mailbox`,
+ ),
+ ),
+ ),
+ )
+ .get();
+ }
+
+ private equals(email: Email, message: FetchMessageObject) {
+ const subjectMatches = email.subject === message.envelope?.subject;
+ const bodyMatches = message.bodyParts?.get('text')?.toString().trim() === email.text.trim();
+ const headers = message.headers?.toLocaleString();
+ const fromMatches = headers?.includes(`Return-Path: <${email.from}>`);
+ const toMatches = headers?.includes(`Delivered-To: ${email.to}`);
+ return subjectMatches && bodyMatches && fromMatches && toMatches;
+ }
+
+ private static Metrics = {
+ INIT: Metric.fromName('Inbox.Initialize').asResult(),
+ FIND: Metric.fromName('Inbox.FindMessage').asResult(),
+ LOCK: Metric.fromName('Inbox.Lock').asResult(),
+ LIST: Metric.fromName('Inbox.List').asResult(),
+ DELETE: Metric.fromName('Inbox.Delete').asResult(),
+ };
+}
diff --git a/lib/email_activity.ts b/lib/email_activity.ts
new file mode 100644
index 0000000..9abdb1f
--- /dev/null
+++ b/lib/email_activity.ts
@@ -0,0 +1,130 @@
+import {
+ Either,
+ type IActivity,
+ type IEither,
+ type ITraceable,
+ jsonModel,
+ JsonResponse,
+ LogLevel,
+ Metric,
+ PenguenoError,
+ type PenguenoRequest,
+ type ServerTrace,
+ TraceUtil,
+} from '@emprespresso/pengueno';
+import {
+ Email,
+ EmailJob,
+ isEmailJob,
+ Inbox,
+ Outbox,
+ EmailSendInstruction,
+ EmailRecvInstruction,
+} from '@emprespresso/uptime';
+
+const wellFormedJobMetric = Metric.fromName('Email.WellFormed').asResult();
+
+const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, EmailJob> =>
+ j
+ .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric))
+ .map((tJson): IEither<PenguenoError, EmailJob> => {
+ const tJob = tJson.get();
+ if (!isEmailJob(tJob)) {
+ const err = 'seems like a pwetty mawfomed job (-.-)';
+ tJson.trace.traceScope(LogLevel.WARN).trace(err);
+ return Either.left(new PenguenoError(err, 400));
+ }
+ return Either.right(tJob);
+ })
+ .peek(TraceUtil.traceResultingEither(wellFormedJobMetric))
+ .get();
+
+export interface IEmailActivity {
+ runMail: IActivity;
+}
+
+const emailJobRequestMetric = Metric.fromName('JobHook.process').asResult();
+export class EmailActivityImpl implements IEmailActivity {
+ constructor(
+ private readonly outboxFactory: (
+ instruction: ITraceable<EmailSendInstruction, ServerTrace>,
+ ) => Promise<IEither<Error, Outbox>>,
+ private readonly inboxFactory: (
+ instruction: ITraceable<EmailRecvInstruction, ServerTrace>,
+ ) => Promise<IEither<Error, Inbox>>,
+ ) {}
+
+ private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
+ return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(emailJobRequestMetric));
+ }
+
+ private async runJob(_tJob: ITraceable<EmailJob, ServerTrace>) {
+ const { from, to, readRetry } = _tJob.get();
+ const emailToTest: Email = {
+ from: from.email,
+ to: to.email,
+ subject: [new Date().toISOString(), crypto.randomUUID()].join(' | '),
+ text: crypto.randomUUID(),
+ };
+ const tJob = _tJob.traceScope(() => `Subject = ${emailToTest.subject}`);
+ const tEmail = tJob.move(emailToTest);
+
+ const inNOut = await (
+ await this.outboxFactory(tJob.move(from))
+ ).joinRightAsync(
+ () => this.inboxFactory(tJob.move(to)),
+ (inbox, outbox) => ({ inbox, outbox }),
+ );
+ const sent = await inNOut.joinRightAsync(
+ () => inNOut.flatMapAsync(({ outbox }) => outbox.send(tEmail)),
+ (prev, n) => prev,
+ );
+ const recvd = await Either.retrying(
+ async () =>
+ sent.joinRightAsync(
+ () =>
+ inNOut.flatMapAsync(({ inbox }) =>
+ inbox.find(tEmail.peek((t) => t.trace.trace('Looking for message'))),
+ ),
+ (prev, n) => prev,
+ ),
+ readRetry.attempts,
+ (_attempt) => new Promise((res) => setTimeout(res, readRetry.interval * 1000)),
+ );
+ const deleted = await recvd
+ .joinRight(inNOut, ({ inbox }, recvd) => inbox.deleteMessages(recvd.uid))
+ .flatMapAsync((x) => x);
+ return inNOut.flatMapAsync(async ({ inbox }) => {
+ await inbox.close();
+ return deleted;
+ });
+ }
+
+ public runMail(r: ITraceable<PenguenoRequest, ServerTrace>) {
+ return this.trace(r)
+ .map(jsonModel(jobJsonTransformer))
+ .map(async (tJob) => {
+ const eitherJob = await tJob.get();
+ return eitherJob.flatMapAsync((job) =>
+ this.runJob(tJob.move(job)).then((res) =>
+ res.mapLeft((err) => {
+ tJob.trace.traceScope(LogLevel.ERROR).trace(err);
+ return new PenguenoError('Failed running job ' + err.toString(), 500);
+ }),
+ ),
+ );
+ })
+ .map(
+ TraceUtil.promiseify(
+ (tEitherEmail) =>
+ new JsonResponse(r, tEitherEmail.get(), {
+ status: tEitherEmail.get().fold(
+ ({ status }) => status,
+ () => 200,
+ ),
+ }),
+ ),
+ )
+ .get();
+ }
+}
diff --git a/lib/healthcheck.ts b/lib/healthcheck.ts
new file mode 100644
index 0000000..3ecc0eb
--- /dev/null
+++ b/lib/healthcheck.ts
@@ -0,0 +1,18 @@
+import {
+ TraceUtil,
+ IEither,
+ ITraceable,
+ HealthChecker,
+ HealthCheckInput,
+ HealthCheckOutput,
+ ServerTrace,
+ Either,
+} from '@emprespresso/pengueno';
+
+export const healthCheck: HealthChecker = (
+ input: ITraceable<HealthCheckInput, ServerTrace>,
+): Promise<IEither<Error, HealthCheckOutput>> =>
+ input
+ .flatMap(TraceUtil.withFunctionTrace(healthCheck))
+ .move(Promise.resolve(Either.right<Error, HealthCheckOutput>(HealthCheckOutput.YAASSSLAYQUEEN)))
+ .get();
diff --git a/lib/index.ts b/lib/index.ts
new file mode 100644
index 0000000..9194b03
--- /dev/null
+++ b/lib/index.ts
@@ -0,0 +1,6 @@
+export * from './model';
+export * from './email';
+export * from './email_activity';
+export * from './healthcheck';
+export * from './server';
+export * from './run';
diff --git a/lib/model.ts b/lib/model.ts
new file mode 100644
index 0000000..7896d70
--- /dev/null
+++ b/lib/model.ts
@@ -0,0 +1,87 @@
+import { isObject } from '@emprespresso/pengueno';
+
+export interface Email {
+ from: string;
+ to: string;
+ subject: string;
+ text: string;
+}
+
+export interface EmailCredentials {
+ email: string;
+ username: string;
+ password: string;
+}
+
+export const isCreds = (u: unknown): u is EmailCredentials =>
+ !!(
+ isObject(u) &&
+ ['email', 'username', 'password'].every((key) => key in u && typeof u[key as keyof typeof u] === 'string')
+ );
+
+export interface EmailSendInstruction extends EmailCredentials {
+ send_host: string;
+ send_port: number;
+}
+
+export const isSend = (u: unknown): u is EmailSendInstruction =>
+ !!(
+ isCreds(u) &&
+ 'send_host' in u &&
+ typeof u.send_host === 'string' &&
+ 'send_port' in u &&
+ typeof u.send_port === 'number'
+ );
+
+export interface EmailRecvInstruction extends EmailCredentials {
+ read_host: string;
+ read_port: number;
+}
+
+export const isRecv = (u: unknown): u is EmailSendInstruction =>
+ !!(
+ isCreds(u) &&
+ 'read_host' in u &&
+ typeof u.read_host === 'string' &&
+ 'read_port' in u &&
+ typeof u.read_port === 'number'
+ );
+
+export interface EmailJob {
+ from: EmailSendInstruction;
+ to: EmailRecvInstruction;
+ readRetry: Retry;
+}
+
+export const isEmailJob = (u: unknown): u is EmailJob =>
+ !!(
+ isObject(u) &&
+ 'from' in u &&
+ isSend(u.from) &&
+ 'to' in u &&
+ isRecv(u.to) &&
+ 'readRetry' in u &&
+ isRetry(u.readRetry)
+ );
+
+export interface Retry {
+ attempts: number;
+ interval: number;
+}
+
+export const isRetry = (u: unknown): u is Retry =>
+ !!(
+ isObject(u) && ['attempts', 'interval'].every((key) => key in u && typeof u[key as keyof typeof u] === 'number')
+ );
+
+export const redact = <T extends EmailCredentials>(instruction: T): T => ({
+ ...instruction,
+ password: 'REDACTED',
+ username: 'REDACTED',
+});
+
+export const redactJob = (job: EmailJob): EmailJob => ({
+ ...job,
+ from: redact(job.from),
+ to: redact(job.to),
+});
diff --git a/lib/run.ts b/lib/run.ts
new file mode 100644
index 0000000..b16fd53
--- /dev/null
+++ b/lib/run.ts
@@ -0,0 +1,7 @@
+import { HonoProxy } from '@emprespresso/pengueno';
+import { UptimeServer } from '@emprespresso/uptime';
+
+const server = new UptimeServer();
+const hono = new HonoProxy(server);
+
+export const runServer = (port: number, hostname: string) => hono.serve(port, hostname);
diff --git a/lib/server.ts b/lib/server.ts
new file mode 100644
index 0000000..9885bf3
--- /dev/null
+++ b/lib/server.ts
@@ -0,0 +1,35 @@
+import {
+ FourOhFourActivityImpl,
+ HealthCheckActivityImpl,
+ type HealthChecker,
+ type IFourOhFourActivity,
+ type IHealthCheckActivity,
+ type ITraceable,
+ PenguenoRequest,
+ Server,
+ type ServerTrace,
+} from '@emprespresso/pengueno';
+import { healthCheck as _healthCheck, EmailActivityImpl, IEmailActivity, Inbox, Outbox } from './index';
+
+export class UptimeServer implements Server {
+ constructor(
+ healthCheck: HealthChecker = _healthCheck,
+ private readonly healthCheckActivity: IHealthCheckActivity = new HealthCheckActivityImpl(healthCheck),
+ private readonly emailActivity: IEmailActivity = new EmailActivityImpl(
+ (a) => Outbox.from(a),
+ (b) => Inbox.from(b),
+ ),
+ private readonly fourOhFourActivity: IFourOhFourActivity = new FourOhFourActivityImpl(),
+ ) {}
+
+ public serve(req: ITraceable<PenguenoRequest, ServerTrace>) {
+ const url = new URL(req.get().req.url);
+ if (url.pathname === '/health') {
+ return this.healthCheckActivity.checkHealth(req);
+ }
+ if (url.pathname === '/email') {
+ return this.emailActivity.runMail(req);
+ }
+ return this.fourOhFourActivity.fourOhFour(req);
+ }
+}