import { Either, type IActivity, type IEither, type ITraceable, jsonModel, JsonResponse, LogLevel, Metric, PenguenoError, type PenguenoRequest, type ServerTrace, TraceUtil, } from '@emprespresso/pengueno'; import { Email, EmailJob, isEmailJob, Inbox, Outbox, EmailSendInstruction, EmailRecvInstruction, } from '@emprespresso/uptime'; const wellFormedJobMetric = Metric.fromName('Email.WellFormed').asResult(); const jobJsonTransformer = (j: ITraceable): IEither => j .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric)) .map((tJson): IEither => { const tJob = tJson.get(); if (!isEmailJob(tJob)) { const err = 'seems like a pwetty mawfomed job (-.-)'; tJson.trace.traceScope(LogLevel.WARN).trace(err); return Either.left(new PenguenoError(err, 400)); } return Either.right(tJob); }) .peek(TraceUtil.traceResultingEither(wellFormedJobMetric)) .get(); export interface IEmailActivity { runMail: IActivity; } const emailJobRequestMetric = Metric.fromName('JobHook.process').asResult(); export class EmailActivityImpl implements IEmailActivity { constructor( private readonly outboxFactory: ( instruction: ITraceable, ) => Promise>, private readonly inboxFactory: ( instruction: ITraceable, ) => Promise>, ) {} private trace(r: ITraceable) { return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(emailJobRequestMetric)); } private async runJob(_tJob: ITraceable) { const { from, to, readRetry } = _tJob.get(); const emailToTest: Email = { from: from.email, to: to.email, subject: [new Date().toISOString(), crypto.randomUUID()].join(' | '), text: crypto.randomUUID(), }; const tJob = _tJob.traceScope(() => `Subject = ${emailToTest.subject}`); const tEmail = tJob.move(emailToTest); const inNOut = await ( await this.outboxFactory(tJob.move(from)) ).joinRightAsync( () => this.inboxFactory(tJob.move(to)), (inbox, outbox) => ({ inbox, outbox }), ); const sent = await inNOut.joinRightAsync( () => inNOut.flatMapAsync(({ outbox }) => outbox.send(tEmail)), (prev, n) => prev, ); const recvd = await Either.retrying( async () => sent.joinRightAsync( () => inNOut.flatMapAsync(({ inbox }) => inbox.find(tEmail.peek((t) => t.trace.trace('Looking for message'))), ), (prev, n) => prev, ), readRetry.attempts, (_attempt) => new Promise((res) => setTimeout(res, readRetry.interval * 1000)), ); const deleted = await recvd .joinRight(inNOut, ({ inbox }, recvd) => inbox.deleteMessages(recvd.uid)) .flatMapAsync((x) => x); return inNOut.flatMapAsync(async ({ inbox }) => { await inbox.close(); return deleted; }); } public runMail(r: ITraceable) { return this.trace(r) .map(jsonModel(jobJsonTransformer)) .map(async (tJob) => { const eitherJob = await tJob.get(); return eitherJob.flatMapAsync((job) => this.runJob(tJob.move(job)).then((res) => res.mapLeft((err) => { tJob.trace.traceScope(LogLevel.ERROR).trace(err); return new PenguenoError('Failed running job ' + err.toString(), 500); }), ), ); }) .map( TraceUtil.promiseify( (tEitherEmail) => new JsonResponse(r, tEitherEmail.get(), { status: tEitherEmail.get().fold( ({ status }) => status, () => 200, ), }), ), ) .get(); } }