import { ImapFlow, type FetchMessageObject, type FetchQueryObject, type MailboxLockObject } from 'imapflow'; import { EmailSendInstruction, type Email, EmailRecvInstruction } from '@emprespresso/uptime'; import { Either, ITraceable, LogLevel, Metric, Optional, ServerTrace, TraceUtil } from '@emprespresso/pengueno'; import { createTransport } from 'nodemailer'; interface IRecv { connect: () => Promise; logout: () => Promise; getMailboxLock: (mailbox: string) => Promise; mailboxClose: () => Promise; fetchAll: (range: string, options: FetchQueryObject) => Promise; messageDelete: (uids: number[], opts: Record) => Promise; } interface ISend { sendMail: (email: Email) => Promise; } export class Outbox { public constructor(private readonly sender: ISend) {} public async send(email: ITraceable) { return email .flatMap(TraceUtil.withMetricTrace(Outbox.Metrics.SEND)) .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Sending email')) .map((email) => Either.fromFailableAsync(() => this.sender.sendMail(email.get()))) .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Outbox.Metrics.SEND))) .get(); } public static async from(instruction: ITraceable) { return instruction .flatMap(TraceUtil.withMetricTrace(Outbox.Metrics.INIT)) .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Initializing outbox')) .map((instruction) => createTransport({ host: instruction.get().send_host, port: instruction.get().send_port, auth: { user: instruction.get().username, pass: instruction.get().password, }, tls: { rejectUnauthorized: false, }, }), ) .map( (transport) => new Outbox({ sendMail: (email: Email) => new Promise((resolve, reject) => transport.get().sendMail(email, (error) => { if (error) { reject(error); } else { resolve(email); } }), ), }), ) .peek(TraceUtil.withMetricTrace(Outbox.Metrics.INIT)) .map((x) => Either.right(x.get())) .get(); } private static Metrics = { INIT: Metric.fromName('Outbox.Initialize').asResult(), SEND: Metric.fromName('Outbox.Send').asResult(), }; } export class Inbox { public constructor( private readonly recv: IRecv, private lock?: ITraceable, ) {} public async deleteMessages(_uids: Array | number) { const uids = Array.isArray(_uids) ? _uids : [_uids]; return this.lock!.flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.DELETE)) .peek((res) => res.trace.traceScope(LogLevel.INFO).trace(`Deleting messages ${uids}`)) .map(() => Either.fromFailableAsync(() => this.recv.messageDelete(uids, { uid: true }))) .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.DELETE))) .get(); } public async listMessages() { return this.lock!.flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.LIST)) .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Listing messages')) .map(() => Either.fromFailableAsync>(() => this.recv.fetchAll('*', { uid: true, envelope: true, headers: true, bodyParts: ['text'], }), ), ) .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.LIST))) .peek( TraceUtil.promiseify((res) => res.trace.trace( res.get().fold( (err) => err.toString(), (ok) => `Found ${ok.length} messages`, ), ), ), ) .get(); } public async close() { await this.recv.mailboxClose().then(() => this.recv.logout()); this.lock!.peek((l) => l.get().release()) .peek((t) => t.trace.trace(Inbox.Metrics.LOCK)) .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Released lock')) .get(); delete this.lock; } public async find(email: ITraceable) { return email .flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.FIND)) .map((e) => this.listMessages().then((eitherMessages) => eitherMessages.flatMap((messages) => Either.fromFailable(() => Optional.from(messages.find((message) => this.equals(e.get(), message))).get(), ), ), ), ) .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.FIND))) .get(); } public static from(instruction: ITraceable) { const client = new ImapFlow({ logger: false, host: instruction.get().read_host, port: instruction.get().read_port, secure: true, auth: { user: instruction.get().username, pass: instruction.get().password, }, }); return instruction .move(client) .flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.INIT)) .peek((res) => res.trace.traceScope(LogLevel.INFO).trace('Initializing mailbox')) .map((tClient) => Either.fromFailableAsync(() => tClient .get() .connect() .then(() => tClient.get().getMailboxLock('INBOX')), ).then((eitherLock) => eitherLock .mapRight((lock) => tClient.move(lock).flatMap(TraceUtil.withMetricTrace(Inbox.Metrics.LOCK))) .mapRight((traceableLock) => new Inbox(tClient.get(), traceableLock)), ), ) .peek(TraceUtil.promiseify(TraceUtil.traceResultingEither(Inbox.Metrics.INIT))) .peek( TraceUtil.promiseify((res) => res.trace.trace( res.get().fold( (err) => `Could not initialize mailbox: ${err.stack}`, (ok) => `Initialized mailbox`, ), ), ), ) .get(); } private equals(email: Email, message: FetchMessageObject) { const subjectMatches = email.subject === message.envelope?.subject; const bodyMatches = message.bodyParts?.get('text')?.toString().trim() === email.text.trim(); const headers = message.headers?.toLocaleString(); const fromMatches = headers?.includes(`From: ${email.from}`); const toMatches = headers?.includes(`To: ${email.to}`); return subjectMatches && bodyMatches && fromMatches && toMatches; } private static Metrics = { INIT: Metric.fromName('Inbox.Initialize').asResult(), FIND: Metric.fromName('Inbox.FindMessage').asResult(), LOCK: Metric.fromName('Inbox.Lock').asResult(), LIST: Metric.fromName('Inbox.List').asResult(), DELETE: Metric.fromName('Inbox.Delete').asResult(), }; }