refactor: introduce domain events for feed side effects (Track E — point 5)

Light "collect + dispatch" variant: the Feed aggregate records FeedEvents
(FeedCreated, EmailIngested) on the mutations that have consequences, exposed via
pullEvents(). A new application dispatcher (feed-events.applyFeedEvents) maps
those events to their side effects — counters (awaited) plus WebSub pings and
favicon fetches handed to a BackgroundScheduler. This removes the inline,
scattered side effects from the ingest hot path (email-processor) and from
createFeedRecord; the aggregate is now the source of truth for "what happened".

Side effects with no aggregate mutation (rejected email, feed deletion bypassing
the aggregate, bulk admin ops, the cron, unsubscribes-sent) stay imperative by
design — there is no aggregate event for them to ride on.

BackgroundScheduler type moved to infrastructure/worker.ts (shared). CLAUDE.md
updated. 355 tests pass (+4 event tests); tsc --noEmit clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Julien Herr
2026-05-24 13:12:42 +02:00
parent 46af982c40
commit b3d42f6c50
9 changed files with 146 additions and 27 deletions
+11 -15
View File
@@ -1,14 +1,12 @@
import { EmailParser } from "../domain/email-parser";
import { AttachmentData, EmailMetadata, Env } from "../types";
import { notifySubscribers } from "../infrastructure/websub";
import { bumpCounters } from "../application/stats";
import {
cacheFaviconForDomain,
extractEmailDomain,
} from "../infrastructure/favicon-fetcher";
import { applyFeedEvents } from "../application/feed-events";
import { extractEmailDomain } from "../infrastructure/favicon-fetcher";
import { parseOneClickUnsubscribe } from "../infrastructure/unsubscribe";
import { getAttachmentBucket } from "../infrastructure/attachments";
import { FeedRepository } from "../infrastructure/feed-repository";
import { BackgroundScheduler } from "../infrastructure/worker";
import { Feed } from "../domain/feed.aggregate";
import { logger } from "../infrastructure/logger";
import { FEED_MAX_BYTES } from "../config/constants";
@@ -183,12 +181,14 @@ async function storeEmail(
]);
logger.info("Email processed", { feedId: feed.id.value });
if (ctx) {
ctx.waitUntil(notifySubscribers(feed.id.value, env));
if (iconDomain) {
ctx.waitUntil(cacheFaviconForDomain(iconDomain, env));
}
}
// The aggregate recorded an EmailIngested event; the dispatcher applies its
// side effects (received counter, WebSub ping, favicon fetch). Background work
// rides on ctx.waitUntil when present, and is skipped in its absence (tests).
const schedule: BackgroundScheduler = ctx
? (p) => ctx.waitUntil(p)
: () => {};
await applyFeedEvents(feed.id, feed.pullEvents(), env, schedule);
}
export async function processEmail(
@@ -203,9 +203,5 @@ export async function processEmail(
}
await storeEmail(validation.feed, input, env, ctx);
await bumpCounters(env.EMAIL_STORAGE, {
emails_received: 1,
last_email_at: new Date().toISOString(),
});
return { ok: true, feedId: validation.feed.id.value };
}