From 1a4fc9535a89b58e8b67c8996ade0b833116af3a Mon Sep 17 00:00:00 2001 From: Elizabeth Hunt Date: Sun, 17 Aug 2025 23:50:24 -0700 Subject: Move to pengueno. --- lib/email.ts | 200 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 lib/email.ts (limited to 'lib/email.ts') diff --git a/lib/email.ts b/lib/email.ts new file mode 100644 index 0000000..38beb0c --- /dev/null +++ b/lib/email.ts @@ -0,0 +1,200 @@ +import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from 'imapflow'; +import { EmailSendInstruction, type Email, EmailRecvInstruction } from '@emprespresso/uptime'; +import { Either, ITraceable, LogLevel, Metric, Optional, ServerTrace, TraceUtil } from '@emprespresso/pengueno'; +import { createTransport } from 'nodemailer'; + +interface IRecv { + connect: () => Promise; + logout: () => Promise; + + getMailboxLock: (mailbox: string) => Promise; + mailboxClose: () => Promise; + + fetchAll: (range: string, options: FetchQueryObject) => Promise; + messageDelete: (uids: number[], opts: Record) => Promise; +} + +interface ISend { + sendMail: (email: Email) => Promise; +} + +export class Outbox { + public constructor(private readonly sender: ISend) {} + + public async send(email: ITraceable) { + return email + .flatMap(TraceUtil.withMetricTrace(Outbox.Metrics.SEND)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Sending email')) + .map((email) => Either.fromFailableAsync(() => this.sender.sendMail(email.get()))) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Outbox.Metrics.SEND))) + .get(); + } + + public static async from(instruction: ITraceable) { + return instruction + .flatMap(TraceUtil.withMetricTrace(Outbox.Metrics.INIT)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Initializing outbox')) + .map((instruction) => + createTransport({ + host: instruction.get().send_host, + port: instruction.get().send_port, + auth: { + user: instruction.get().username, + pass: instruction.get().password, + }, + tls: { + rejectUnauthorized: false, + }, + }), + ) + .map( + (transport) => + new Outbox({ + sendMail: (email: Email) => + new Promise((resolve, reject) => + transport.get().sendMail(email, (error) => { + if (error) { + reject(error); + } else { + resolve(email); + } + }), + ), + }), + ) + .peek(TraceUtil.withMetricTrace(Outbox.Metrics.INIT)) + .map((x) => Either.right(x.get())) + .get(); + } + + private static Metrics = { + INIT: Metric.fromName('Outbox.Initialize').asResult(), + SEND: Metric.fromName('Outbox.Send').asResult(), + }; +} + +export class Inbox { + public constructor( + private readonly recv: IRecv, + private lock?: ITraceable, + ) {} + + public async deleteMessages(_uids: Array | number) { + const uids = Array.isArray(_uids) ? _uids : [_uids]; + return this.lock!.flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.DELETE)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace(`Deleting messages ${uids}`)) + .map(() => Either.fromFailableAsync(() => this.recv.messageDelete(uids, { uid: true }))) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.DELETE))) + .get(); + } + + public async listMessages() { + return this.lock!.flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.LIST)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Listing messages')) + .map(() => + Either.fromFailableAsync>(() => + this.recv.fetchAll('*', { + uid: true, + envelope: true, + headers: true, + bodyParts: ['text'], + }), + ), + ) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.LIST))) + .peek( + TraceUtil.promiseify((res) => + res.trace.trace( + res.get().fold( + (err) => err.toString(), + (ok) => `Found ${ok.length} messages`, + ), + ), + ), + ) + .get(); + } + + public async close() { + await this.recv.mailboxClose().then(() => this.recv.logout()); + this.lock!.peek((l) => l.get().release()) + .peek(t => t.trace.trace(Inbox.Metrics.LOCK)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Released lock')) + .get(); + delete this.lock; + } + + public async find(email: ITraceable) { + return email + .flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.FIND)) + .map((e) => + this.listMessages().then((eitherMessages) => + eitherMessages.flatMap((messages) => + Either.fromFailable(() => + Optional.from(messages.find((message) => this.equals(e.get(), message))).get(), + ), + ), + ), + ) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.FIND))) + .get(); + } + + public static from(instruction: ITraceable) { + const client = new ImapFlow({ + logger: false, + host: instruction.get().read_host, + port: instruction.get().read_port, + secure: true, + auth: { + user: instruction.get().username, + pass: instruction.get().password, + }, + }); + return instruction + .move(client) + .flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.INIT)) + .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Initializing mailbox')) + .map((tClient) => + Either.fromFailableAsync(() => + tClient + .get() + .connect() + .then(() => tClient.get().getMailboxLock('INBOX')), + ).then((eitherLock) => + eitherLock + .mapRight((lock) => tClient.move(lock).flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.LOCK))) + .mapRight((traceableLock) => new Inbox(tClient.get(), traceableLock)), + ), + ) + .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.INIT))) + .peek( + TraceUtil.promiseify((res) => + res.trace.trace( + res.get().fold( + (err) => `Could not initialize mailbox: ${err.stack}`, + (ok) => `Initialized mailbox`, + ), + ), + ), + ) + .get(); + } + + private equals(email: Email, message: FetchMessageObject) { + const subjectMatches = email.subject === message.envelope?.subject; + const bodyMatches = message.bodyParts?.get('text')?.toString().trim() === email.text.trim(); + const headers = message.headers?.toLocaleString(); + const fromMatches = headers?.includes(`Return-Path: <${email.from}>`); + const toMatches = headers?.includes(`Delivered-To: ${email.to}`); + return subjectMatches && bodyMatches && fromMatches && toMatches; + } + + private static Metrics = { + INIT: Metric.fromName('Inbox.Initialize').asResult(), + FIND: Metric.fromName('Inbox.FindMessage').asResult(), + LOCK: Metric.fromName('Inbox.Lock').asResult(), + LIST: Metric.fromName('Inbox.List').asResult(), + DELETE: Metric.fromName('Inbox.Delete').asResult(), + }; +} -- cgit v1.2.3-70-g09d2