diff options
author | Elizabeth Hunt <me@liz.coffee> | 2025-08-17 23:50:24 -0700 |
---|---|---|
committer | Elizabeth Hunt <me@liz.coffee> | 2025-08-17 23:53:38 -0700 |
commit | 1a4fc9535a89b58e8b67c8996ade0b833116af3a (patch) | |
tree | d16f3129d7bb69f204bba8422e909354195a0042 /lib/email_activity.ts | |
parent | 157dc327e8fe63541b517cfbeeaf202a3e8553a5 (diff) | |
download | uptime-1a4fc9535a89b58e8b67c8996ade0b833116af3a.tar.gz uptime-1a4fc9535a89b58e8b67c8996ade0b833116af3a.zip |
Move to pengueno.
Diffstat (limited to 'lib/email_activity.ts')
-rw-r--r-- | lib/email_activity.ts | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/lib/email_activity.ts b/lib/email_activity.ts new file mode 100644 index 0000000..9abdb1f --- /dev/null +++ b/lib/email_activity.ts @@ -0,0 +1,130 @@ +import { + Either, + type IActivity, + type IEither, + type ITraceable, + jsonModel, + JsonResponse, + LogLevel, + Metric, + PenguenoError, + type PenguenoRequest, + type ServerTrace, + TraceUtil, +} from '@emprespresso/pengueno'; +import { + Email, + EmailJob, + isEmailJob, + Inbox, + Outbox, + EmailSendInstruction, + EmailRecvInstruction, +} from '@emprespresso/uptime'; + +const wellFormedJobMetric = Metric.fromName('Email.WellFormed').asResult(); + +const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, EmailJob> => + j + .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric)) + .map((tJson): IEither<PenguenoError, EmailJob> => { + const tJob = tJson.get(); + if (!isEmailJob(tJob)) { + const err = 'seems like a pwetty mawfomed job (-.-)'; + tJson.trace.traceScope(LogLevel.WARN).trace(err); + return Either.left(new PenguenoError(err, 400)); + } + return Either.right(tJob); + }) + .peek(TraceUtil.traceResultingEither(wellFormedJobMetric)) + .get(); + +export interface IEmailActivity { + runMail: IActivity; +} + +const emailJobRequestMetric = Metric.fromName('JobHook.process').asResult(); +export class EmailActivityImpl implements IEmailActivity { + constructor( + private readonly outboxFactory: ( + instruction: ITraceable<EmailSendInstruction, ServerTrace>, + ) => Promise<IEither<Error, Outbox>>, + private readonly inboxFactory: ( + instruction: ITraceable<EmailRecvInstruction, ServerTrace>, + ) => Promise<IEither<Error, Inbox>>, + ) {} + + private trace(r: ITraceable<PenguenoRequest, ServerTrace>) { + return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(emailJobRequestMetric)); + } + + private async runJob(_tJob: ITraceable<EmailJob, ServerTrace>) { + const { from, to, readRetry } = _tJob.get(); + const emailToTest: Email = { + from: from.email, + to: to.email, + subject: [new Date().toISOString(), crypto.randomUUID()].join(' | '), + text: crypto.randomUUID(), + }; + const tJob = _tJob.traceScope(() => `Subject = ${emailToTest.subject}`); + const tEmail = tJob.move(emailToTest); + + const inNOut = await ( + await this.outboxFactory(tJob.move(from)) + ).joinRightAsync( + () => this.inboxFactory(tJob.move(to)), + (inbox, outbox) => ({ inbox, outbox }), + ); + const sent = await inNOut.joinRightAsync( + () => inNOut.flatMapAsync(({ outbox }) => outbox.send(tEmail)), + (prev, n) => prev, + ); + const recvd = await Either.retrying( + async () => + sent.joinRightAsync( + () => + inNOut.flatMapAsync(({ inbox }) => + inbox.find(tEmail.peek((t) => t.trace.trace('Looking for message'))), + ), + (prev, n) => prev, + ), + readRetry.attempts, + (_attempt) => new Promise((res) => setTimeout(res, readRetry.interval * 1000)), + ); + const deleted = await recvd + .joinRight(inNOut, ({ inbox }, recvd) => inbox.deleteMessages(recvd.uid)) + .flatMapAsync((x) => x); + return inNOut.flatMapAsync(async ({ inbox }) => { + await inbox.close(); + return deleted; + }); + } + + public runMail(r: ITraceable<PenguenoRequest, ServerTrace>) { + return this.trace(r) + .map(jsonModel(jobJsonTransformer)) + .map(async (tJob) => { + const eitherJob = await tJob.get(); + return eitherJob.flatMapAsync((job) => + this.runJob(tJob.move(job)).then((res) => + res.mapLeft((err) => { + tJob.trace.traceScope(LogLevel.ERROR).trace(err); + return new PenguenoError('Failed running job ' + err.toString(), 500); + }), + ), + ); + }) + .map( + TraceUtil.promiseify( + (tEitherEmail) => + new JsonResponse(r, tEitherEmail.get(), { + status: tEitherEmail.get().fold( + ({ status }) => status, + () => 200, + ), + }), + ), + ) + .get(); + } +} |