summaryrefslogtreecommitdiff
path: root/lib/email_activity.ts
blob: 9abdb1f6f40cc2b54f065aaf75ee307867cc09e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import {
    Either,
    type IActivity,
    type IEither,
    type ITraceable,
    jsonModel,
    JsonResponse,
    LogLevel,
    Metric,
    PenguenoError,
    type PenguenoRequest,
    type ServerTrace,
    TraceUtil,
} from '@emprespresso/pengueno';
import {
    Email,
    EmailJob,
    isEmailJob,
    Inbox,
    Outbox,
    EmailSendInstruction,
    EmailRecvInstruction,
} from '@emprespresso/uptime';

const wellFormedJobMetric = Metric.fromName('Email.WellFormed').asResult();

const jobJsonTransformer = (j: ITraceable<unknown, ServerTrace>): IEither<PenguenoError, EmailJob> =>
    j
        .flatMap(TraceUtil.withMetricTrace(wellFormedJobMetric))
        .map((tJson): IEither<PenguenoError, EmailJob> => {
            const tJob = tJson.get();
            if (!isEmailJob(tJob)) {
                const err = 'seems like a pwetty mawfomed job (-.-)';
                tJson.trace.traceScope(LogLevel.WARN).trace(err);
                return Either.left(new PenguenoError(err, 400));
            }
            return Either.right(tJob);
        })
        .peek(TraceUtil.traceResultingEither(wellFormedJobMetric))
        .get();

export interface IEmailActivity {
    runMail: IActivity;
}

const emailJobRequestMetric = Metric.fromName('JobHook.process').asResult();
export class EmailActivityImpl implements IEmailActivity {
    constructor(
        private readonly outboxFactory: (
            instruction: ITraceable<EmailSendInstruction, ServerTrace>,
        ) => Promise<IEither<Error, Outbox>>,
        private readonly inboxFactory: (
            instruction: ITraceable<EmailRecvInstruction, ServerTrace>,
        ) => Promise<IEither<Error, Inbox>>,
    ) {}

    private trace(r: ITraceable<PenguenoRequest, ServerTrace>) {
        return r.flatMap(TraceUtil.withClassTrace(this)).flatMap(TraceUtil.withMetricTrace(emailJobRequestMetric));
    }

    private async runJob(_tJob: ITraceable<EmailJob, ServerTrace>) {
        const { from, to, readRetry } = _tJob.get();
        const emailToTest: Email = {
            from: from.email,
            to: to.email,
            subject: [new Date().toISOString(), crypto.randomUUID()].join(' | '),
            text: crypto.randomUUID(),
        };
        const tJob = _tJob.traceScope(() => `Subject = ${emailToTest.subject}`);
        const tEmail = tJob.move(emailToTest);

        const inNOut = await (
            await this.outboxFactory(tJob.move(from))
        ).joinRightAsync(
            () => this.inboxFactory(tJob.move(to)),
            (inbox, outbox) => ({ inbox, outbox }),
        );
        const sent = await inNOut.joinRightAsync(
            () => inNOut.flatMapAsync(({ outbox }) => outbox.send(tEmail)),
            (prev, n) => prev,
        );
        const recvd = await Either.retrying(
            async () =>
                sent.joinRightAsync(
                    () =>
                        inNOut.flatMapAsync(({ inbox }) =>
                            inbox.find(tEmail.peek((t) => t.trace.trace('Looking for message'))),
                        ),
                    (prev, n) => prev,
                ),
            readRetry.attempts,
            (_attempt) => new Promise((res) => setTimeout(res, readRetry.interval * 1000)),
        );
        const deleted = await recvd
            .joinRight(inNOut, ({ inbox }, recvd) => inbox.deleteMessages(recvd.uid))
            .flatMapAsync((x) => x);
        return inNOut.flatMapAsync(async ({ inbox }) => {
            await inbox.close();
            return deleted;
        });
    }

    public runMail(r: ITraceable<PenguenoRequest, ServerTrace>) {
        return this.trace(r)
            .map(jsonModel(jobJsonTransformer))
            .map(async (tJob) => {
                const eitherJob = await tJob.get();
                return eitherJob.flatMapAsync((job) =>
                    this.runJob(tJob.move(job)).then((res) =>
                        res.mapLeft((err) => {
                            tJob.trace.traceScope(LogLevel.ERROR).trace(err);
                            return new PenguenoError('Failed running job ' + err.toString(), 500);
                        }),
                    ),
                );
            })
            .map(
                TraceUtil.promiseify(
                    (tEitherEmail) =>
                        new JsonResponse(r, tEitherEmail.get(), {
                            status: tEitherEmail.get().fold(
                                ({ status }) => status,
                                () => 200,
                            ),
                        }),
                ),
            )
            .get();
    }
}