diff --git a/CLAUDE.md b/CLAUDE.md index abeffd6..360f296 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -61,12 +61,14 @@ src/ feed.ts # Pure invariant functions (expiry, sender policy, byte budget) the aggregate delegates to feed-keys.ts # The KV key schema (pure string builders), shared by every repository clock.ts # Clock port (systemClock) — injected into the aggregate; no ambient Date.now() + events.ts # FeedEvent union (FeedCreated, EmailIngested) recorded by the aggregate email-parser.ts # Email parsing (addresses, headers, encoded words) format.ts # Pure formatting helpers (formatBytes) value-objects/ # FeedId, EmailAddress, Domain, SenderPolicy (immutable, self-validating) application/ # Use-cases / orchestration (wires domain + infrastructure) feed-service.ts # createFeedRecord / editFeedDetails / editFeed / deleteFeedRecord (admin UI + REST API) feed-cleanup.ts # Feed/email storage cleanup: purgeFeedKeysStep, collectUnsubscribeUrls, attachment+key deletion + feed-events.ts # Dispatcher: maps aggregate FeedEvents to side effects (counters, WebSub, favicon) email-processor.ts # Core ingestion: load aggregate → accepts? → feed.ingest → persist feed-fetcher.ts # Read model for RSS/Atom rendering (config + email bodies; bypasses the aggregate) stats.ts # Monitoring counters increment policy + storage scans @@ -140,6 +142,7 @@ The KV key schema lives in `src/domain/feed-keys.ts` (pure, framework-agnostic) - **The `Feed` aggregate is the only writer of feed config + the email index.** Load it with `FeedRepository.load(feedId)`, mutate via its methods (`ingest`, `removeEmails`, `editDetails`, `edit`), then persist with `save`/`saveMetadata`/`saveConfig`. No route or service mutates `metadata.emails` directly. Email **bodies** are large blobs outside the aggregate — flush them (`putEmail`/`deleteEmail`) alongside the metadata save. - Read-only RSS/Atom rendering uses the `feed-fetcher` read model, not the aggregate (no invariant to enforce on the hot path). - KV has no multi-key transaction; the aggregate is the seam a future Durable Object would wrap to serialise concurrent ingests (see `email-processor.ts`). + - **Side effects via domain events.** Mutations with consequences record a `FeedEvent` (`FeedCreated`, `EmailIngested`). After persisting, the caller `pullEvents()` and passes them to `application/feed-events.applyFeedEvents`, which runs the counters/WebSub/favicon. Don't inline those side effects at call sites. Side effects with no aggregate mutation (a rejected email, feed deletion that bypasses the aggregate, bulk admin ops, the cron) stay imperative — they have no event to ride on. - **`FeedId`** is the type used by the domain (`Feed.id`) and every single-feed `FeedRepository` method. Wrap a raw id string with `FeedId.fromTrusted(value)` at the call site; keep `.value` (string) for URLs, logs, JSON and the feed-list registry. Mint new ids with `FeedId.generate()`. ### Worker bindings (`Env`) diff --git a/src/application/email-processor.ts b/src/application/email-processor.ts index f5c8759..3c5f63a 100644 --- a/src/application/email-processor.ts +++ b/src/application/email-processor.ts @@ -1,14 +1,12 @@ import { EmailParser } from "../domain/email-parser"; import { AttachmentData, EmailMetadata, Env } from "../types"; -import { notifySubscribers } from "../infrastructure/websub"; import { bumpCounters } from "../application/stats"; -import { - cacheFaviconForDomain, - extractEmailDomain, -} from "../infrastructure/favicon-fetcher"; +import { applyFeedEvents } from "../application/feed-events"; +import { extractEmailDomain } from "../infrastructure/favicon-fetcher"; import { parseOneClickUnsubscribe } from "../infrastructure/unsubscribe"; import { getAttachmentBucket } from "../infrastructure/attachments"; import { FeedRepository } from "../infrastructure/feed-repository"; +import { BackgroundScheduler } from "../infrastructure/worker"; import { Feed } from "../domain/feed.aggregate"; import { logger } from "../infrastructure/logger"; import { FEED_MAX_BYTES } from "../config/constants"; @@ -183,12 +181,14 @@ async function storeEmail( ]); logger.info("Email processed", { feedId: feed.id.value }); - if (ctx) { - ctx.waitUntil(notifySubscribers(feed.id.value, env)); - if (iconDomain) { - ctx.waitUntil(cacheFaviconForDomain(iconDomain, env)); - } - } + + // The aggregate recorded an EmailIngested event; the dispatcher applies its + // side effects (received counter, WebSub ping, favicon fetch). Background work + // rides on ctx.waitUntil when present, and is skipped in its absence (tests). + const schedule: BackgroundScheduler = ctx + ? (p) => ctx.waitUntil(p) + : () => {}; + await applyFeedEvents(feed.id, feed.pullEvents(), env, schedule); } export async function processEmail( @@ -203,9 +203,5 @@ export async function processEmail( } await storeEmail(validation.feed, input, env, ctx); - await bumpCounters(env.EMAIL_STORAGE, { - emails_received: 1, - last_email_at: new Date().toISOString(), - }); return { ok: true, feedId: validation.feed.id.value }; } diff --git a/src/application/feed-events.ts b/src/application/feed-events.ts new file mode 100644 index 0000000..02097c5 --- /dev/null +++ b/src/application/feed-events.ts @@ -0,0 +1,42 @@ +import { Env } from "../types"; +import { FeedEvent } from "../domain/events"; +import { FeedId } from "../domain/value-objects/feed-id"; +import { BackgroundScheduler } from "../infrastructure/worker"; +import { bumpCounters } from "./stats"; +import { notifySubscribers } from "../infrastructure/websub"; +import { cacheFaviconForDomain } from "../infrastructure/favicon-fetcher"; + +/** + * Apply the side effects of a feed's domain events — the single place that maps + * "what happened" (FeedCreated, EmailIngested) to its consequences. Counter + * writes are awaited (they must land); WebSub pings and favicon fetches are + * handed to the caller's background scheduler (`ctx.waitUntil` at the edge, a + * no-op when none is available). + */ +export async function applyFeedEvents( + feedId: FeedId, + events: FeedEvent[], + env: Env, + schedule: BackgroundScheduler, +): Promise { + for (const event of events) { + switch (event.type) { + case "FeedCreated": + await bumpCounters(env.EMAIL_STORAGE, { + feeds_created: 1, + last_feed_created_at: new Date().toISOString(), + }); + break; + case "EmailIngested": + await bumpCounters(env.EMAIL_STORAGE, { + emails_received: 1, + last_email_at: new Date().toISOString(), + }); + schedule(notifySubscribers(feedId.value, env)); + if (event.iconDomain) { + schedule(cacheFaviconForDomain(event.iconDomain, env)); + } + break; + } + } +} diff --git a/src/application/feed-service.test.ts b/src/application/feed-service.test.ts index 28b18ac..dddfed8 100644 --- a/src/application/feed-service.test.ts +++ b/src/application/feed-service.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect } from "vitest"; import { createMockEnv } from "../test/setup"; import { createFeedRecord, editFeed } from "./feed-service"; +import { getCounters } from "./stats"; import type { Env } from "../types"; const mkEnv = (overrides: Partial = {}) => @@ -42,6 +43,14 @@ describe("createFeedRecord — TTL policy", () => { // 1h (server) wins over 9999h (client). expect(config.expires_at!).toBeLessThan(before + TWO_HOURS); }); + + it("bumps the feeds_created counter via the FeedCreated domain event", async () => { + const env = mkEnv(); + await createFeedRecord(env, { ...baseInput }); + const counters = await getCounters(env.EMAIL_STORAGE); + expect(counters.feeds_created).toBe(1); + expect(counters.last_feed_created_at).toBeDefined(); + }); }); describe("editFeed — TTL policy", () => { diff --git a/src/application/feed-service.ts b/src/application/feed-service.ts index e58559b..5ce3a87 100644 --- a/src/application/feed-service.ts +++ b/src/application/feed-service.ts @@ -1,8 +1,10 @@ import { Env, FeedConfig } from "../types"; import { bumpCounters } from "../application/stats"; +import { applyFeedEvents } from "./feed-events"; import { sendUnsubscribes } from "../infrastructure/unsubscribe"; import { getAttachmentBucket } from "../infrastructure/attachments"; import { FeedRepository } from "../infrastructure/feed-repository"; +import { BackgroundScheduler } from "../infrastructure/worker"; import { FeedId } from "../domain/value-objects/feed-id"; import { Feed, @@ -50,10 +52,8 @@ export async function createFeedRecord( feed.config.expires_at, ); - await bumpCounters(env.EMAIL_STORAGE, { - feeds_created: 1, - last_feed_created_at: new Date().toISOString(), - }); + // FeedCreated → bumps the feeds_created counter (no background work to schedule). + await applyFeedEvents(feed.id, feed.pullEvents(), env, () => {}); return { feedId: feed.id.value, config: feed.config }; } @@ -162,13 +162,6 @@ export async function deleteFeedFastDetailed( return { ok: configDeleted, configDeleted, metadataDeleted, errors }; } -/** - * Schedules a fire-and-forget background task. The HTTP edge passes an adapter - * over `ctx.waitUntil` (e.g. `(p) => waitUntilSafe(c, p)`); keeping it a plain - * function means the application layer never imports Hono's `Context`. - */ -export type BackgroundScheduler = (task: Promise) => void; - /** * Delete a single feed end-to-end: capture unsubscribe URLs, drop its config + * metadata, remove it from the list, bump the counter, and hand the background diff --git a/src/domain/events.ts b/src/domain/events.ts new file mode 100644 index 0000000..6058ab1 --- /dev/null +++ b/src/domain/events.ts @@ -0,0 +1,16 @@ +/** + * Domain events the Feed aggregate records when it mutates. They describe *what + * happened* in business terms; the application layer decides which side effects + * to run (counters, WebSub pings, favicon caching) via a dispatcher. This keeps + * the aggregate ignorant of infrastructure and the orchestration code free of + * scattered, inline side effects. + * + * Only mutations that currently have side effects emit events — feed creation + * and email ingestion. Edits and removals carry no side effect, so they emit + * nothing. Side effects that don't flow through the aggregate (a rejected email, + * a feed deletion that bypasses the aggregate, bulk admin operations) stay + * outside this mechanism by design — they have no aggregate event to ride on. + */ +export type FeedEvent = + | { type: "FeedCreated" } + | { type: "EmailIngested"; iconDomain?: string }; diff --git a/src/domain/feed.aggregate.test.ts b/src/domain/feed.aggregate.test.ts index cb4c62b..6c44d29 100644 --- a/src/domain/feed.aggregate.test.ts +++ b/src/domain/feed.aggregate.test.ts @@ -173,6 +173,43 @@ describe("Feed.removeEmails", () => { }); }); +describe("Feed events", () => { + it("records FeedCreated on create and drains it once", () => { + const feed = Feed.create(FID, createInput()); + expect(feed.pullEvents()).toEqual([{ type: "FeedCreated" }]); + // Draining clears: a second pull is empty. + expect(feed.pullEvents()).toEqual([]); + }); + + it("records EmailIngested (with icon domain) on ingest", () => { + const feed = Feed.reconstitute( + FID, + { title: "T", language: "en", created_at: 0 }, + { emails: [] }, + ); + feed.ingest(entry({ key: "k" }), { + maxBytes: 1_000_000, + iconDomain: "example.com", + }); + expect(feed.pullEvents()).toEqual([ + { type: "EmailIngested", iconDomain: "example.com" }, + ]); + }); + + it("emits no events for editDetails / edit / removeEmails", () => { + const feed = Feed.reconstitute( + FID, + { title: "T", language: "en", created_at: 0, expires_at: 9_999_999_999 }, + { emails: [entry({ key: "k1" })] }, + fixedClock(1000), + ); + feed.editDetails({ title: "X" }); + feed.edit({ description: "Y" }, { recomputeExpiry: false }); + feed.removeEmails(["k1"]); + expect(feed.pullEvents()).toEqual([]); + }); +}); + describe("FeedRepository.load / save round-trip", () => { it("persists a created feed and reflects later mutations", async () => { const repo = new FeedRepository(mockEnv().EMAIL_STORAGE); diff --git a/src/domain/feed.aggregate.ts b/src/domain/feed.aggregate.ts index 6c4b0af..0ac971c 100644 --- a/src/domain/feed.aggregate.ts +++ b/src/domain/feed.aggregate.ts @@ -2,6 +2,7 @@ import { FeedConfig, FeedMetadata, EmailMetadata } from "../types"; import { FeedId } from "./value-objects/feed-id"; import { SenderPolicy, SenderDecision } from "./value-objects/sender-policy"; import { Clock, systemClock } from "./clock"; +import { FeedEvent } from "./events"; import { resolveExpiresAt, isExpired, trimToByteBudget } from "./feed"; export interface CreateFeedInput { @@ -65,6 +66,8 @@ export interface IngestOptions { * concurrent writers (see email-processor.ts). */ export class Feed { + private readonly _events: FeedEvent[] = []; + private constructor( readonly id: FeedId, private _config: FeedConfig, @@ -91,7 +94,9 @@ export class Feed { updated_at: now, ...(expiresAt !== undefined ? { expires_at: expiresAt } : {}), }; - return new Feed(id, config, { emails: [] }, clock); + const feed = new Feed(id, config, { emails: [] }, clock); + feed._events.push({ type: "FeedCreated" }); + return feed; } /** Rebuild an aggregate from persisted state. */ @@ -112,6 +117,16 @@ export class Feed { return this._metadata; } + /** + * Drain the domain events recorded since the last pull. The application layer + * calls this after persisting and feeds them to a dispatcher that runs the + * side effects (counters, WebSub, favicon). Clearing on read keeps a long-lived + * aggregate from re-emitting. + */ + pullEvents(): FeedEvent[] { + return this._events.splice(0, this._events.length); + } + isExpired(now: number = this.clock.now()): boolean { return isExpired(this._config, now); } @@ -145,6 +160,7 @@ export class Feed { }; } + this._events.push({ type: "EmailIngested", iconDomain: opts.iconDomain }); return trimToByteBudget(this._metadata, opts.maxBytes); } diff --git a/src/infrastructure/worker.ts b/src/infrastructure/worker.ts index cecd33d..4a5221d 100644 --- a/src/infrastructure/worker.ts +++ b/src/infrastructure/worker.ts @@ -1,5 +1,12 @@ import { Context } from "hono"; +/** + * Schedules a fire-and-forget background task. The HTTP edge adapts this over + * `ctx.waitUntil`; the application/domain layers depend on this plain function + * type instead of Hono's `Context`. + */ +export type BackgroundScheduler = (task: Promise) => void; + /** Calls ctx.waitUntil() without throwing when the ExecutionContext is absent (e.g. Node tests). */ export function waitUntilSafe(c: Context, promise: Promise): void { try {