summaryrefslogtreecommitdiff
path: root/lib/email_activity.ts
diff options
context:
space:
mode:
authorElizabeth Hunt <me@liz.coffee>2025-08-17 23:50:24 -0700
committerElizabeth Hunt <me@liz.coffee>2025-08-17 23:53:38 -0700
commit1a4fc9535a89b58e8b67c8996ade0b833116af3a (patch)
treed16f3129d7bb69f204bba8422e909354195a0042 /lib/email_activity.ts
parent157dc327e8fe63541b517cfbeeaf202a3e8553a5 (diff)
downloaduptime-1a4fc9535a89b58e8b67c8996ade0b833116af3a.tar.gz
uptime-1a4fc9535a89b58e8b67c8996ade0b833116af3a.zip
Move to pengueno.
Diffstat (limited to 'lib/email_activity.ts')
-rw-r--r--lib/email_activity.ts130
1 files changed, 130 insertions, 0 deletions
diff --git a/lib/email_activity.ts b/lib/email_activity.ts
new file mode 100644
index 0000000..9abdb1f
--- /dev/null
+++ b/lib/email_activity.ts
@@ -0,0 +1,130 @@
+import {
+ Either,
+ type IActivity,
+ type IEither,
+ type ITraceable,
+ jsonModel,
+ JsonResponse,
+ LogLevel,
+ Metric,
+ PenguenoError,
+ type PenguenoRequest,
+ type ServerTrace,
+ TraceUtil,
+} from '@emprespresso/pengueno';
+import {
+ Email,
+ EmailJob,
+ isEmailJob,
+ Inbox,
+ Outbox,
+ EmailSendInstruction,
+ EmailRecvInstruction,
+} from '@emprespresso/uptime';
+
+const wellFormedJobMetric = Metric.fromName('Email.WellFormed').asResult();
+
+const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, EmailJob> =>
+ j
+ .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric))
+ .map((tJson): IEither<PenguenoError, EmailJob> => {
+ const tJob = tJson.get();
+ if (!isEmailJob(tJob)) {
+ const err = 'seems like a pwetty mawfomed job (-.-)';
+ tJson.trace.traceScope(LogLevel.WARN).trace(err);
+ return Either.left(new PenguenoError(err, 400));
+ }
+ return Either.right(tJob);
+ })
+ .peek(TraceUtil.traceResultingEither(wellFormedJobMetric))
+ .get();
+
+export interface IEmailActivity {
+ runMail: IActivity;
+}
+
+const emailJobRequestMetric = Metric.fromName('JobHook.process').asResult();
+export class EmailActivityImpl implements IEmailActivity {
+ constructor(
+ private readonly outboxFactory: (
+ instruction: ITraceable<EmailSendInstruction, ServerTrace>,
+ ) => Promise<IEither<Error, Outbox>>,
+ private readonly inboxFactory: (
+ instruction: ITraceable<EmailRecvInstruction, ServerTrace>,
+ ) => Promise<IEither<Error, Inbox>>,
+ ) {}
+
+ private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
+ return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(emailJobRequestMetric));
+ }
+
+ private async runJob(_tJob: ITraceable<EmailJob, ServerTrace>) {
+ const { from, to, readRetry } = _tJob.get();
+ const emailToTest: Email = {
+ from: from.email,
+ to: to.email,
+ subject: [new Date().toISOString(), crypto.randomUUID()].join(' | '),
+ text: crypto.randomUUID(),
+ };
+ const tJob = _tJob.traceScope(() => `Subject = ${emailToTest.subject}`);
+ const tEmail = tJob.move(emailToTest);
+
+ const inNOut = await (
+ await this.outboxFactory(tJob.move(from))
+ ).joinRightAsync(
+ () => this.inboxFactory(tJob.move(to)),
+ (inbox, outbox) => ({ inbox, outbox }),
+ );
+ const sent = await inNOut.joinRightAsync(
+ () => inNOut.flatMapAsync(({ outbox }) => outbox.send(tEmail)),
+ (prev, n) => prev,
+ );
+ const recvd = await Either.retrying(
+ async () =>
+ sent.joinRightAsync(
+ () =>
+ inNOut.flatMapAsync(({ inbox }) =>
+ inbox.find(tEmail.peek((t) => t.trace.trace('Looking for message'))),
+ ),
+ (prev, n) => prev,
+ ),
+ readRetry.attempts,
+ (_attempt) => new Promise((res) => setTimeout(res, readRetry.interval * 1000)),
+ );
+ const deleted = await recvd
+ .joinRight(inNOut, ({ inbox }, recvd) => inbox.deleteMessages(recvd.uid))
+ .flatMapAsync((x) => x);
+ return inNOut.flatMapAsync(async ({ inbox }) => {
+ await inbox.close();
+ return deleted;
+ });
+ }
+
+ public runMail(r: ITraceable<PenguenoRequest, ServerTrace>) {
+ return this.trace(r)
+ .map(jsonModel(jobJsonTransformer))
+ .map(async (tJob) => {
+ const eitherJob = await tJob.get();
+ return eitherJob.flatMapAsync((job) =>
+ this.runJob(tJob.move(job)).then((res) =>
+ res.mapLeft((err) => {
+ tJob.trace.traceScope(LogLevel.ERROR).trace(err);
+ return new PenguenoError('Failed running job ' + err.toString(), 500);
+ }),
+ ),
+ );
+ })
+ .map(
+ TraceUtil.promiseify(
+ (tEitherEmail) =>
+ new JsonResponse(r, tEitherEmail.get(), {
+ status: tEitherEmail.get().fold(
+ ({ status }) => status,
+ () => 200,
+ ),
+ }),
+ ),
+ )
+ .get();
+ }
+}