From ad196f1761719a019298142e801e247f91889c97 Mon Sep 17 00:00:00 2001 From: Julien Herr Date: Sun, 24 May 2026 13:45:13 +0200 Subject: [PATCH] refactor: tighten DDD boundaries on the Feed aggregate Address five modeling tensions in one pass: - Encapsulation: the Feed aggregate no longer exposes raw config/metadata (a shallow Readonly still leaked mutable arrays). It now offers intention-revealing accessors that return copies, plus toConfigSnapshot/toMetadataSnapshot for the repository and summary() for the global registry. - feeds:list consistency: FeedRepository.save/saveConfig upsert the registry entry from feed.summary(), so services no longer mirror title/description/ expiry by hand (the old add/updateInList footgun is gone). - domain/feed.ts: drop the dead applySenderPolicy, internalise resolveExpiresAt and trimToByteBudget into the aggregate; feed.ts keeps only the shared isExpired predicate used by the read-model routes. - Single edit path: remove editDetails; edit(patch, deps) is the sole config mutation, with a systematic expired guard. Renaming an expired feed now 403s. - FeedId flows through the application and infrastructure signatures; fromTrusted/parse happen once at the edge, .value only at the serialisation boundaries (urls, feed-generator, feed-keys, logs, JSON). 347 tests green, tsc clean, Worker bundle builds. Co-Authored-By: Claude Opus 4.7 --- src/application/email-processor.ts | 4 +- src/application/feed-cleanup.ts | 19 +-- src/application/feed-events.ts | 2 +- src/application/feed-fetcher.ts | 9 +- src/application/feed-service.test.ts | 9 +- src/application/feed-service.ts | 53 +++---- src/domain/feed.aggregate.test.ts | 64 +++++--- src/domain/feed.aggregate.ts | 141 +++++++++++++++--- src/domain/feed.test.ts | 87 +---------- src/domain/feed.ts | 75 ++-------- src/index.ts | 7 +- src/infrastructure/feed-repository.test.ts | 36 ++++- src/infrastructure/feed-repository.ts | 79 +++++----- .../websub-subscription-repository.test.ts | 8 +- .../websub-subscription-repository.ts | 12 +- src/infrastructure/websub.test.ts | 82 +++++----- src/infrastructure/websub.ts | 38 ++--- src/routes/admin.tsx | 14 +- src/routes/admin/emails.tsx | 6 +- src/routes/admin/feeds.tsx | 30 ++-- src/routes/api/index.ts | 17 +-- src/routes/atom.ts | 3 +- src/routes/hub.ts | 6 +- src/routes/rss.ts | 3 +- 24 files changed, 406 insertions(+), 398 deletions(-) diff --git a/src/application/email-processor.ts b/src/application/email-processor.ts index 3c5f63a..96e72d9 100644 --- a/src/application/email-processor.ts +++ b/src/application/email-processor.ts @@ -95,8 +95,8 @@ async function loadAcceptingFeed( logger.warn("Rejected email: sender filter", { feedId: feedId.value, senders: input.senders, - allowedSenders: feed.config.allowed_senders, - blockedSenders: feed.config.blocked_senders, + allowedSenders: feed.allowedSenders(), + blockedSenders: feed.blockedSenders(), }); return { ok: false, reason: "sender_blocked" }; } diff --git a/src/application/feed-cleanup.ts b/src/application/feed-cleanup.ts index bd8a755..58f6a37 100644 --- a/src/application/feed-cleanup.ts +++ b/src/application/feed-cleanup.ts @@ -9,7 +9,7 @@ import { FeedId } from "../domain/value-objects/feed-id"; // attachmentIds. export async function deleteAttachmentsForEmails( env: Env, - emails: EmailMetadata[], + emails: readonly EmailMetadata[], keys: Iterable, ): Promise { const keySet = new Set(keys); @@ -58,16 +58,14 @@ export async function deleteKeysWithConcurrency( */ export async function collectUnsubscribeUrls( emailStorage: KVNamespace, - feedId: string, + feedId: FeedId, ): Promise { try { - const metadata = await new FeedRepository(emailStorage).getMetadata( - FeedId.fromTrusted(feedId), - ); + const metadata = await new FeedRepository(emailStorage).getMetadata(feedId); return Object.values(metadata?.unsubscribe ?? {}); } catch (error) { logger.error("Error reading unsubscribe URLs", { - feedId, + feedId: feedId.value, error: String(error), }); return []; @@ -76,7 +74,7 @@ export async function collectUnsubscribeUrls( export async function purgeFeedKeysStep( emailStorage: KVNamespace, - feedId: string, + feedId: FeedId, options: { cursor?: string; limit?: number; bucket?: R2Bucket } = {}, ): Promise<{ deletedKeys: string[]; @@ -85,15 +83,14 @@ export async function purgeFeedKeysStep( listComplete: boolean; }> { const repo = new FeedRepository(emailStorage); - const id = FeedId.fromTrusted(feedId); - const listed = await repo.listFeedKeys(id, { + const listed = await repo.listFeedKeys(feedId, { cursor: options.cursor, limit: options.limit, }); const keys = listed.names; if (options.bucket && keys.length > 0) { - const emailKeys = keys.filter((k) => repo.isEmailKey(id, k)); + const emailKeys = keys.filter((k) => repo.isEmailKey(feedId, k)); if (emailKeys.length > 0) { const emailDataResults = await Promise.allSettled( emailKeys.map((k) => repo.getEmail(k)), @@ -128,7 +125,7 @@ export async function purgeFeedKeysStep( export async function purgeExpiredFeeds( emailStorage: KVNamespace, - feedId: string, + feedId: FeedId, bucket?: R2Bucket, ): Promise { let cursor: string | undefined; diff --git a/src/application/feed-events.ts b/src/application/feed-events.ts index 02097c5..e257092 100644 --- a/src/application/feed-events.ts +++ b/src/application/feed-events.ts @@ -32,7 +32,7 @@ export async function applyFeedEvents( emails_received: 1, last_email_at: new Date().toISOString(), }); - schedule(notifySubscribers(feedId.value, env)); + schedule(notifySubscribers(feedId, env)); if (event.iconDomain) { schedule(cacheFaviconForDomain(event.iconDomain, env)); } diff --git a/src/application/feed-fetcher.ts b/src/application/feed-fetcher.ts index 1c66490..dc5a06d 100644 --- a/src/application/feed-fetcher.ts +++ b/src/application/feed-fetcher.ts @@ -9,17 +9,16 @@ export interface FeedData { } export async function fetchFeedData( - feedId: string, + feedId: FeedId, env: Env, ): Promise { const repo = FeedRepository.from(env); - const id = FeedId.fromTrusted(feedId); - const feedMetadata = await repo.getMetadata(id); + const feedMetadata = await repo.getMetadata(feedId); if (!feedMetadata) return null; - const feedConfig = (await repo.getConfig(id)) ?? { - title: `Newsletter Feed ${feedId}`, + const feedConfig = (await repo.getConfig(feedId)) ?? { + title: `Newsletter Feed ${feedId.value}`, description: "Converted email newsletter", language: "en", created_at: Date.now(), diff --git a/src/application/feed-service.test.ts b/src/application/feed-service.test.ts index dddfed8..1c1408d 100644 --- a/src/application/feed-service.test.ts +++ b/src/application/feed-service.test.ts @@ -2,6 +2,7 @@ import { describe, it, expect } from "vitest"; import { createMockEnv } from "../test/setup"; import { createFeedRecord, editFeed } from "./feed-service"; import { getCounters } from "./stats"; +import { FeedId } from "../domain/value-objects/feed-id"; import type { Env } from "../types"; const mkEnv = (overrides: Partial = {}) => @@ -59,7 +60,9 @@ describe("editFeed — TTL policy", () => { const { feedId } = await createFeedRecord(env, { ...baseInput }); const before = Date.now(); - const result = await editFeed(env, feedId, { title: "renamed" }); + const result = await editFeed(env, FeedId.fromTrusted(feedId), { + title: "renamed", + }); expect(result.status).toBe("ok"); if (result.status === "ok") { @@ -75,7 +78,9 @@ describe("editFeed — TTL policy", () => { lifetimeHours: 5, }); - const result = await editFeed(env, feedId, { title: "x" }); + const result = await editFeed(env, FeedId.fromTrusted(feedId), { + title: "x", + }); expect(result.status).toBe("ok"); if (result.status === "ok") { diff --git a/src/application/feed-service.ts b/src/application/feed-service.ts index 5ce3a87..620f047 100644 --- a/src/application/feed-service.ts +++ b/src/application/feed-service.ts @@ -45,17 +45,11 @@ export async function createFeedRecord( }); await repo.save(feed); - await repo.addToList( - feed.id, - feed.config.title, - feed.config.description, - feed.config.expires_at, - ); // FeedCreated → bumps the feeds_created counter (no background work to schedule). await applyFeedEvents(feed.id, feed.pullEvents(), env, () => {}); - return { feedId: feed.id.value, config: feed.config }; + return { feedId: feed.id.value, config: feed.toConfigSnapshot() }; } export type UpdateFeedResult = @@ -64,28 +58,26 @@ export type UpdateFeedResult = | { status: "expired" }; /** - * In-place edit of title/description only — never touches expiry. Used by the - * dashboard's minimal edit. Mirrors the new title/description into the list. + * Quick-edit of title/description only — never recomputes expiry. Used by the + * dashboard's minimal edit. Delegates to the aggregate's single `edit` path, so + * an expired feed is rejected here too. The list projection is kept in sync by + * the repository on `saveConfig`. */ export async function editFeedDetails( env: Env, - feedId: string, + feedId: FeedId, patch: { title?: string; description?: string }, ): Promise { const repo = FeedRepository.from(env); - const feed = await repo.load(FeedId.fromTrusted(feedId)); + const feed = await repo.load(feedId); if (!feed) return { status: "not_found" }; - feed.editDetails(patch); + if (feed.edit(patch, { recomputeExpiry: false }).status === "expired") { + return { status: "expired" }; + } await repo.saveConfig(feed); - await repo.updateInList( - feed.id, - feed.config.title, - feed.config.description, - feed.config.expires_at, - ); - return { status: "ok", config: feed.config }; + return { status: "ok", config: feed.toConfigSnapshot() }; } /** @@ -94,11 +86,11 @@ export async function editFeedDetails( */ export async function editFeed( env: Env, - feedId: string, + feedId: FeedId, input: UpdateFeedInput, ): Promise { const repo = FeedRepository.from(env); - const feed = await repo.load(FeedId.fromTrusted(feedId)); + const feed = await repo.load(feedId); if (!feed) return { status: "not_found" }; const recomputeExpiry = @@ -113,14 +105,8 @@ export async function editFeed( } await repo.saveConfig(feed); - await repo.updateInList( - feed.id, - feed.config.title, - feed.config.description, - feed.config.expires_at, - ); - return { status: "ok", config: feed.config }; + return { status: "ok", config: feed.toConfigSnapshot() }; } type DeleteFeedFastResult = { @@ -136,24 +122,23 @@ type DeleteFeedFastResult = { */ export async function deleteFeedFastDetailed( emailStorage: KVNamespace, - feedId: string, + feedId: FeedId, ): Promise { const repo = new FeedRepository(emailStorage); - const id = FeedId.fromTrusted(feedId); const errors: string[] = []; let configDeleted = false; let metadataDeleted = false; try { - await repo.deleteConfig(id); + await repo.deleteConfig(feedId); configDeleted = true; } catch (error) { errors.push(`config delete failed: ${String(error)}`); } try { - await repo.deleteMetadata(id); + await repo.deleteMetadata(feedId); metadataDeleted = true; } catch (error) { errors.push(`metadata delete failed: ${String(error)}`); @@ -170,7 +155,7 @@ export async function deleteFeedFastDetailed( */ export async function deleteFeedRecord( env: Env, - feedId: string, + feedId: FeedId, schedule: BackgroundScheduler, ): Promise { const emailStorage = env.EMAIL_STORAGE; @@ -180,7 +165,7 @@ export async function deleteFeedRecord( const unsubscribeUrls = await collectUnsubscribeUrls(emailStorage, feedId); await deleteFeedFastDetailed(emailStorage, feedId); - const removed = await repo.removeFromList(FeedId.fromTrusted(feedId)); + const removed = await repo.removeFromList(feedId); if (removed) { await bumpCounters(emailStorage, { feeds_deleted: 1 }); } diff --git a/src/domain/feed.aggregate.test.ts b/src/domain/feed.aggregate.test.ts index 6c44d29..a5656ca 100644 --- a/src/domain/feed.aggregate.test.ts +++ b/src/domain/feed.aggregate.test.ts @@ -34,9 +34,9 @@ describe("Feed.create", () => { it("builds a config with an empty email index and no expiry by default", () => { const feed = Feed.create(FID, createInput()); expect(feed.id.value).toBe("a.b.42"); - expect(feed.config.title).toBe("News"); - expect(feed.config.expires_at).toBeUndefined(); - expect(feed.metadata.emails).toEqual([]); + expect(feed.title).toBe("News"); + expect(feed.expiresAt).toBeUndefined(); + expect(feed.emails).toEqual([]); }); it("resolves expiry from the supplied ttlHours using the injected clock", () => { @@ -45,9 +45,9 @@ describe("Feed.create", () => { clock: fixedClock(NOW), ttlHours: 2, }); - expect(feed.config.created_at).toBe(NOW); - expect(feed.config.updated_at).toBe(NOW); - expect(feed.config.expires_at).toBe(NOW + 2 * 3_600_000); + expect(feed.createdAt).toBe(NOW); + expect(feed.updatedAt).toBe(NOW); + expect(feed.expiresAt).toBe(NOW + 2 * 3_600_000); }); it("trusts only deps.ttlHours, not the client lifetimeHours field", () => { @@ -56,7 +56,16 @@ describe("Feed.create", () => { const feed = Feed.create(FID, createInput({ lifetimeHours: 9999 }), { ttlHours: undefined, }); - expect(feed.config.expires_at).toBeUndefined(); + expect(feed.expiresAt).toBeUndefined(); + }); + + it("treats a non-positive ttlHours as no expiry", () => { + expect( + Feed.create(FID, createInput(), { ttlHours: 0 }).expiresAt, + ).toBeUndefined(); + expect( + Feed.create(FID, createInput(), { ttlHours: -5 }).expiresAt, + ).toBeUndefined(); }); }); @@ -109,11 +118,11 @@ describe("Feed.edit", () => { ); feed.edit({ title: "T2" }, { recomputeExpiry: false }); - expect(feed.config.expires_at).toBe(FUTURE); // preserved - expect(feed.config.updated_at).toBe(NOW); + expect(feed.expiresAt).toBe(FUTURE); // preserved + expect(feed.updatedAt).toBe(NOW); feed.edit({ title: "T3" }, { recomputeExpiry: true, ttlHours: 1 }); - expect(feed.config.expires_at).toBe(NOW + 3_600_000); + expect(feed.expiresAt).toBe(NOW + 3_600_000); }); it("refuses to edit an already-expired feed", () => { @@ -143,13 +152,28 @@ describe("Feed.ingest", () => { unsub: { senderKey: "news@example.com", url: "https://u/1" }, }); - expect(feed.metadata.emails[0].key).toBe("new"); - expect(feed.metadata.iconDomain).toBe("example.com"); - expect(feed.metadata.unsubscribe).toEqual({ + expect(feed.emails[0].key).toBe("new"); + expect(feed.iconDomain).toBe("example.com"); + expect(feed.unsubscribeUrls()).toEqual({ "news@example.com": "https://u/1", }); expect(dropped.map((e) => e.key)).toEqual(["old"]); - expect(feed.metadata.emails.map((e) => e.key)).toEqual(["new"]); + expect(feed.emails.map((e) => e.key)).toEqual(["new"]); + }); + + it("always keeps the just-ingested entry, even when it alone is oversized", () => { + const feed = Feed.reconstitute( + FID, + { title: "T", language: "en", created_at: 0 }, + { emails: [] }, + ); + + const { dropped } = feed.ingest(entry({ key: "huge", size: 999 }), { + maxBytes: 1, + }); + + expect(dropped).toEqual([]); + expect(feed.emails.map((e) => e.key)).toEqual(["huge"]); }); }); @@ -169,7 +193,7 @@ describe("Feed.removeEmails", () => { const { removed } = feed.removeEmails(["k1", "k3", "missing"]); expect(removed.map((e) => e.key).sort()).toEqual(["k1", "k3"]); - expect(feed.metadata.emails.map((e) => e.key)).toEqual(["k2"]); + expect(feed.emails.map((e) => e.key)).toEqual(["k2"]); }); }); @@ -196,14 +220,14 @@ describe("Feed events", () => { ]); }); - it("emits no events for editDetails / edit / removeEmails", () => { + it("emits no events for edit / removeEmails", () => { const feed = Feed.reconstitute( FID, { title: "T", language: "en", created_at: 0, expires_at: 9_999_999_999 }, { emails: [entry({ key: "k1" })] }, fixedClock(1000), ); - feed.editDetails({ title: "X" }); + feed.edit({ title: "X" }, { recomputeExpiry: false }); feed.edit({ description: "Y" }, { recomputeExpiry: false }); feed.removeEmails(["k1"]); expect(feed.pullEvents()).toEqual([]); @@ -218,15 +242,13 @@ describe("FeedRepository.load / save round-trip", () => { const loaded = await repo.load(FID); expect(loaded).not.toBeNull(); - expect(loaded!.config.title).toBe("Round"); + expect(loaded!.title).toBe("Round"); loaded!.ingest(entry({ key: "feed:a.b.42:1" }), { maxBytes: 1_000_000 }); await repo.saveMetadata(loaded!); const reloaded = await repo.load(FID); - expect(reloaded!.metadata.emails.map((e) => e.key)).toEqual([ - "feed:a.b.42:1", - ]); + expect(reloaded!.emails.map((e) => e.key)).toEqual(["feed:a.b.42:1"]); }); it("returns null when the feed has no config", async () => { diff --git a/src/domain/feed.aggregate.ts b/src/domain/feed.aggregate.ts index 0ac971c..7d1e7e9 100644 --- a/src/domain/feed.aggregate.ts +++ b/src/domain/feed.aggregate.ts @@ -1,9 +1,32 @@ -import { FeedConfig, FeedMetadata, EmailMetadata } from "../types"; +import { + FeedConfig, + FeedMetadata, + EmailMetadata, + FeedListItem, +} from "../types"; import { FeedId } from "./value-objects/feed-id"; import { SenderPolicy, SenderDecision } from "./value-objects/sender-policy"; import { Clock, systemClock } from "./clock"; import { FeedEvent } from "./events"; -import { resolveExpiresAt, isExpired, trimToByteBudget } from "./feed"; +import { isExpired } from "./feed"; + +const HOUR_MS = 3_600_000; + +/** + * Resolve a feed's `expires_at` from an already-resolved lifetime (hours) and a + * current instant. Returns undefined when no positive lifetime applies (the feed + * never expires). Which lifetime applies (client request vs. server-side + * override, env parsing) is the application layer's call — the aggregate only + * receives the resolved number. File-private: the aggregate is its sole user. + */ +function resolveExpiresAt( + ttlHours: number | undefined, + now: number, +): number | undefined { + return ttlHours !== undefined && Number.isFinite(ttlHours) && ttlHours > 0 + ? now + ttlHours * HOUR_MS + : undefined; +} export interface CreateFeedInput { title: string; @@ -109,12 +132,78 @@ export class Feed { return new Feed(id, config, metadata, clock); } - get config(): Readonly { - return this._config; + // ── Intention-revealing reads ───────────────────────────────────────────── + // The aggregate exposes named fields and copies of its collections, never the + // raw `config`/`metadata` objects — a shallow `Readonly<…>` would still let a + // caller mutate the arrays inside. Persistence reads `toConfigSnapshot()` / + // `toMetadataSnapshot()`; the registry reads `summary()`. + + get title(): string { + return this._config.title; } - get metadata(): Readonly { - return this._metadata; + get description(): string | undefined { + return this._config.description; + } + + get language(): string { + return this._config.language; + } + + get createdAt(): number { + return this._config.created_at; + } + + get updatedAt(): number | undefined { + return this._config.updated_at; + } + + get expiresAt(): number | undefined { + return this._config.expires_at; + } + + get iconDomain(): string | undefined { + return this._metadata.iconDomain; + } + + allowedSenders(): string[] { + return [...(this._config.allowed_senders ?? [])]; + } + + blockedSenders(): string[] { + return [...(this._config.blocked_senders ?? [])]; + } + + /** A copy of the email index — mutating it never touches aggregate state. */ + get emails(): readonly EmailMetadata[] { + return [...this._metadata.emails]; + } + + /** Per-sender one-click unsubscribe links (copy). */ + unsubscribeUrls(): Record { + return { ...(this._metadata.unsubscribe ?? {}) }; + } + + /** The projection stored in the global `feeds:list` registry. */ + summary(): FeedListItem { + return { + id: this.id.value, + title: this._config.title, + description: this._config.description, + expires_at: this._config.expires_at, + }; + } + + // ── Persistence snapshots (repository-only) ─────────────────────────────── + + /** A serialisable copy of the config for the repository to persist. */ + toConfigSnapshot(): FeedConfig { + return { ...this._config }; + } + + /** A serialisable copy of the email index for the repository to persist. */ + toMetadataSnapshot(): FeedMetadata { + return { ...this._metadata, emails: [...this._metadata.emails] }; } /** @@ -161,7 +250,24 @@ export class Feed { } this._events.push({ type: "EmailIngested", iconDomain: opts.iconDomain }); - return trimToByteBudget(this._metadata, opts.maxBytes); + return this.trimToByteBudget(opts.maxBytes); + } + + /** + * Enforce the per-feed byte budget by dropping the oldest emails (from the + * tail of the index) until the total fits, always keeping at least one entry. + * Returns the dropped entries so the caller can purge their KV/R2 storage. + */ + private trimToByteBudget(maxBytes: number): { dropped: EmailMetadata[] } { + const emails = this._metadata.emails; + let totalSize = emails.reduce((sum, e) => sum + (e.size ?? 0), 0); + const dropped: EmailMetadata[] = []; + while (totalSize > maxBytes && emails.length > 1) { + const entry = emails.pop()!; + totalSize -= entry.size ?? 0; + dropped.push(entry); + } + return { dropped }; } /** @@ -180,21 +286,12 @@ export class Feed { } /** - * In-place edit of the presentational fields only (title + description). Never - * touches expiry or the sender policy — used by the dashboard's minimal edit. - */ - editDetails(patch: { title?: string; description?: string }): void { - if (patch.title !== undefined) this._config.title = patch.title; - if (patch.description !== undefined) { - this._config.description = patch.description; - } - this._config.updated_at = this.clock.now(); - } - - /** - * Full edit: apply the patch and recompute expiry from the application-supplied - * lifetime when asked (an absent recompute preserves the current expiry). - * Rejects an already-expired feed without mutating it. + * The single edit path. Apply the patch (only the fields it carries) and + * recompute expiry from the application-supplied lifetime when asked — an + * absent recompute preserves the current expiry, which covers the dashboard's + * title/description quick-edit (`recomputeExpiry: false`). Rejects an + * already-expired feed without mutating it, so a quick-edit can no more touch + * an expired feed than a full edit can. */ edit( patch: UpdateFeedInput, diff --git a/src/domain/feed.test.ts b/src/domain/feed.test.ts index 400dd8b..9cf78b8 100644 --- a/src/domain/feed.test.ts +++ b/src/domain/feed.test.ts @@ -1,26 +1,5 @@ import { describe, it, expect } from "vitest"; -import { - resolveExpiresAt, - isExpired, - applySenderPolicy, - trimToByteBudget, -} from "./feed"; -import type { FeedMetadata, EmailMetadata } from "../types"; - -describe("resolveExpiresAt", () => { - const NOW = 1_000_000; - - it("returns undefined when no positive lifetime applies", () => { - expect(resolveExpiresAt(undefined, NOW)).toBeUndefined(); - expect(resolveExpiresAt(0, NOW)).toBeUndefined(); - expect(resolveExpiresAt(-5, NOW)).toBeUndefined(); - expect(resolveExpiresAt(NaN, NOW)).toBeUndefined(); - }); - - it("computes expiry from a supplied lifetime relative to now", () => { - expect(resolveExpiresAt(2, NOW)).toBe(NOW + 2 * 3_600_000); - }); -}); +import { isExpired } from "./feed"; describe("isExpired", () => { it("is false when no expiry is set", () => { @@ -33,67 +12,3 @@ describe("isExpired", () => { expect(isExpired({ expires_at: 1000 }, 999)).toBe(false); }); }); - -describe("applySenderPolicy", () => { - it("accepts everything when no lists are configured", () => { - expect(applySenderPolicy({}, ["anyone@example.com"])).toBe("accepted"); - }); - - it("requires an allowlist match when an allowlist is set", () => { - const config = { allowed_senders: ["news@example.com"] }; - expect(applySenderPolicy(config, ["news@example.com"])).toBe("accepted"); - expect(applySenderPolicy(config, ["other@example.com"])).toBe("blocked"); - }); - - it("matches an allowlist by domain", () => { - const config = { allowed_senders: ["example.com"] }; - expect(applySenderPolicy(config, ["anyone@example.com"])).toBe("accepted"); - }); - - it("blocks a blocklisted sender even when allowlisted", () => { - const config = { - allowed_senders: ["example.com"], - blocked_senders: ["spam@example.com"], - }; - expect(applySenderPolicy(config, ["spam@example.com"])).toBe("blocked"); - expect(applySenderPolicy(config, ["ok@example.com"])).toBe("accepted"); - }); - - it("with only a blocklist, accepts everything else", () => { - const config = { blocked_senders: ["bad.com"] }; - expect(applySenderPolicy(config, ["x@bad.com"])).toBe("blocked"); - expect(applySenderPolicy(config, ["x@good.com"])).toBe("accepted"); - }); -}); - -describe("trimToByteBudget", () => { - const entry = (key: string, size: number): EmailMetadata => ({ - key, - subject: key, - receivedAt: 1, - size, - }); - - it("keeps everything within budget", () => { - const meta: FeedMetadata = { emails: [entry("a", 10), entry("b", 10)] }; - const { dropped } = trimToByteBudget(meta, 100); - expect(dropped).toEqual([]); - expect(meta.emails).toHaveLength(2); - }); - - it("drops the oldest entries (from the tail) until within budget", () => { - const meta: FeedMetadata = { - emails: [entry("new", 30), entry("mid", 30), entry("old", 30)], - }; - const { dropped } = trimToByteBudget(meta, 50); - expect(dropped.map((e) => e.key)).toEqual(["old", "mid"]); - expect(meta.emails.map((e) => e.key)).toEqual(["new"]); - }); - - it("always keeps at least one entry, even when oversized", () => { - const meta: FeedMetadata = { emails: [entry("only", 999)] }; - const { dropped } = trimToByteBudget(meta, 1); - expect(dropped).toEqual([]); - expect(meta.emails).toHaveLength(1); - }); -}); diff --git a/src/domain/feed.ts b/src/domain/feed.ts index 9165cf6..7eda331 100644 --- a/src/domain/feed.ts +++ b/src/domain/feed.ts @@ -1,37 +1,14 @@ -import { FeedConfig, FeedMetadata, EmailMetadata } from "../types"; -import { SenderPolicy, SenderDecision } from "./value-objects/sender-policy"; - -const HOUR_MS = 3_600_000; - -export type { SenderDecision }; +import { FeedConfig } from "../types"; /** - * The Feed aggregate's invariants, in one framework-agnostic place: expiry, - * sender allow/block policy, and the email-size budget. No I/O and no ambient - * time or environment — callers pass `now` (from a Clock) and a resolved - * lifetime; persistence goes through the FeedRepository. - */ - -/** - * Resolve a feed's `expires_at` from an already-resolved lifetime (hours) and a - * current instant. Returns undefined when no positive lifetime applies (i.e. the - * feed never expires). The policy decision of *which* lifetime applies (a client - * request vs. a server-side `FEED_TTL_HOURS` override, and parsing the env - * string) belongs to the application layer, not here. - */ -export function resolveExpiresAt( - ttlHours: number | undefined, - now: number, -): number | undefined { - return ttlHours !== undefined && Number.isFinite(ttlHours) && ttlHours > 0 - ? now + ttlHours * HOUR_MS - : undefined; -} - -/** - * Whether a feed has reached its expiry instant. `now` defaults to the wall - * clock for convenience at the HTTP edge (routes); the aggregate always passes - * its injected clock so its own behaviour stays deterministic. + * The expiry predicate, shared between the Feed aggregate and the read-model + * routes (rss/atom/entries) that render from a config snapshot without loading + * the aggregate. This is the *only* feed invariant that lives outside the + * aggregate, precisely because the hot read path bypasses it. + * + * `now` defaults to the wall clock for convenience at the HTTP edge; the + * aggregate always passes its injected clock so its own behaviour stays + * deterministic. */ export function isExpired( config: Pick, @@ -39,37 +16,3 @@ export function isExpired( ): boolean { return config.expires_at !== undefined && config.expires_at <= now; } - -/** - * Decide whether an inbound email is accepted, given the feed's sender lists and - * the message's candidate sender addresses. Thin wrapper over the `SenderPolicy` - * value object (which holds the matching semantics). - */ -export function applySenderPolicy( - config: Pick, - senders: string[], -): SenderDecision { - return SenderPolicy.fromLists( - config.allowed_senders, - config.blocked_senders, - ).decide(senders); -} - -/** - * Enforce the per-feed byte budget by dropping the oldest emails (mutating - * `metadata.emails`) until the total fits, always keeping at least one entry. - * Returns the dropped entries so the caller can purge their KV/R2 storage. - */ -export function trimToByteBudget( - metadata: FeedMetadata, - maxBytes: number, -): { dropped: EmailMetadata[] } { - let totalSize = metadata.emails.reduce((sum, e) => sum + (e.size ?? 0), 0); - const dropped: EmailMetadata[] = []; - while (totalSize > maxBytes && metadata.emails.length > 1) { - const entry = metadata.emails.pop()!; - totalSize -= entry.size ?? 0; - dropped.push(entry); - } - return { dropped }; -} diff --git a/src/index.ts b/src/index.ts index e5c5bdd..a1ae60b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,7 @@ import { Env } from "./types"; import { logger } from "./infrastructure/logger"; import { FeedRepository } from "./infrastructure/feed-repository"; import { purgeExpiredFeeds } from "./application/feed-cleanup"; +import { FeedId } from "./domain/value-objects/feed-id"; import { bumpCounters, scanR2Usage, @@ -206,7 +207,11 @@ export default { .map((f) => f.id); for (const feedId of expiredIds) { - await purgeExpiredFeeds(env.EMAIL_STORAGE, feedId, attachmentBucket); + await purgeExpiredFeeds( + env.EMAIL_STORAGE, + FeedId.fromTrusted(feedId), + attachmentBucket, + ); } if (expiredIds.length > 0) { await repo.removeFromListBulk(expiredIds); diff --git a/src/infrastructure/feed-repository.test.ts b/src/infrastructure/feed-repository.test.ts index c9956d6..969c76a 100644 --- a/src/infrastructure/feed-repository.test.ts +++ b/src/infrastructure/feed-repository.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect } from "vitest"; import { createMockEnv } from "../test/setup"; import { FeedRepository } from "./feed-repository"; +import { Feed } from "../domain/feed.aggregate"; import { FeedId } from "../domain/value-objects/feed-id"; import type { Env, FeedConfig, FeedMetadata, EmailData } from "../types"; @@ -95,10 +96,29 @@ describe("FeedRepository emails", () => { }); describe("FeedRepository feed list", () => { - it("adds, updates, lists and removes feeds with expiry", async () => { + const feedWith = ( + id: string, + title: string, + opts: { description?: string; expires_at?: number } = {}, + ) => + Feed.reconstitute( + fid(id), + { + title, + language: "en", + created_at: 1000, + description: opts.description, + expires_at: opts.expires_at, + }, + { emails: [] }, + ); + + it("upserts the list entry from the aggregate on save/saveConfig", async () => { const repo = new FeedRepository(mockEnv().EMAIL_STORAGE); - await repo.addToList(fid("a.b.42"), "One", "desc", 5000); - await repo.addToList(fid("c.d.99"), "Two"); + await repo.save( + feedWith("a.b.42", "One", { description: "desc", expires_at: 5000 }), + ); + await repo.save(feedWith("c.d.99", "Two")); let feeds = await repo.listFeeds(); expect(feeds).toHaveLength(2); @@ -107,8 +127,10 @@ describe("FeedRepository feed list", () => { expires_at: 5000, }); - await repo.updateInList(fid("a.b.42"), "One-updated", undefined, undefined); + // saveConfig refreshes the same entry in place (no duplicate, expiry cleared). + await repo.saveConfig(feedWith("a.b.42", "One-updated")); feeds = await repo.listFeeds(); + expect(feeds.filter((f) => f.id === "a.b.42")).toHaveLength(1); const updated = feeds.find((f) => f.id === "a.b.42"); expect(updated).toMatchObject({ title: "One-updated" }); expect(updated?.expires_at).toBeUndefined(); @@ -121,9 +143,9 @@ describe("FeedRepository feed list", () => { it("bulk-removes only the matching ids", async () => { const repo = new FeedRepository(mockEnv().EMAIL_STORAGE); - await repo.addToList(fid("a.b.42"), "One"); - await repo.addToList(fid("c.d.99"), "Two"); - await repo.addToList(fid("e.f.10"), "Three"); + await repo.save(feedWith("a.b.42", "One")); + await repo.save(feedWith("c.d.99", "Two")); + await repo.save(feedWith("e.f.10", "Three")); const removed = await repo.removeFromListBulk(["a.b.42", "e.f.10", "nope"]); expect(removed.sort()).toEqual(["a.b.42", "e.f.10"]); diff --git a/src/infrastructure/feed-repository.ts b/src/infrastructure/feed-repository.ts index 5d2a7e8..720595f 100644 --- a/src/infrastructure/feed-repository.ts +++ b/src/infrastructure/feed-repository.ts @@ -72,28 +72,38 @@ export class FeedRepository { return Feed.reconstitute(feedId, config, metadata ?? { emails: [] }); } - /** Persist both keys the aggregate owns (config + metadata). */ + /** + * Persist both keys the aggregate owns (config + metadata) and keep the global + * `feeds:list` entry in sync. The registry projection is derived from + * `feed.summary()` here, so no caller has to remember to mirror it. + */ async save(feed: Feed): Promise { await Promise.all([ - this.putConfig(feed.id, feed.config), - this.putMetadata(feed.id, feed.metadata), + this.putConfig(feed.id, feed.toConfigSnapshot()), + this.putMetadata(feed.id, feed.toMetadataSnapshot()), + this.upsertListEntry(feed.summary()), ]); } /** * Persist only the email index. Used by the ingest/delete paths where config - * is unchanged — avoids a redundant config write on the hot path. + * is unchanged — avoids a redundant config write on the hot path. The list + * projection (title/description/expiry) is untouched, so it is not rewritten. */ async saveMetadata(feed: Feed): Promise { - await this.putMetadata(feed.id, feed.metadata); + await this.putMetadata(feed.id, feed.toMetadataSnapshot()); } /** - * Persist only the config. Used by the rename/edit paths where metadata is - * unchanged — avoids re-writing (and risking clobbering) the email index. + * Persist only the config and refresh the `feeds:list` entry from it. Used by + * the rename/edit paths where metadata is unchanged — avoids re-writing (and + * risking clobbering) the email index. */ async saveConfig(feed: Feed): Promise { - await this.putConfig(feed.id, feed.config); + await Promise.all([ + this.putConfig(feed.id, feed.toConfigSnapshot()), + this.upsertListEntry(feed.summary()), + ]); } // ── Feed config ─────────────────────────────────────────────────────────── @@ -156,50 +166,29 @@ export class FeedRepository { } } - async addToList( - feedId: FeedId, - title: string, - description?: string, - expires_at?: number, - ): Promise { + /** + * Insert-or-update a feed's entry in the global `feeds:list` registry from its + * aggregate summary. Idempotent by feed id. Private: callers persist a `Feed` + * via `save`/`saveConfig`, which keep the projection in sync — never mirror the + * list by hand. (Read-modify-write is not atomic under KV, unchanged from the + * prior add/update split.) + */ + private async upsertListEntry(summary: FeedListItem): Promise { try { const feedList = ((await this.kv.get(FEEDS_LIST_KEY, { type: "json", })) as FeedList | null) || { feeds: [] }; - feedList.feeds.push({ id: feedId.value, title, description, expires_at }); + const index = feedList.feeds.findIndex((feed) => feed.id === summary.id); + if (index === -1) { + feedList.feeds.push(summary); + } else { + feedList.feeds[index] = summary; + } await this.kv.put(FEEDS_LIST_KEY, JSON.stringify(feedList)); } catch (error) { - logger.error("Error adding feed to list", { - feedId: feedId.value, - error: String(error), - }); - } - } - - async updateInList( - feedId: FeedId, - title: string, - description?: string, - expires_at?: number, - ): Promise { - try { - const feedList = ((await this.kv.get(FEEDS_LIST_KEY, { - type: "json", - })) as FeedList | null) || { feeds: [] }; - - const feedIndex = feedList.feeds.findIndex( - (feed) => feed.id === feedId.value, - ); - if (feedIndex !== -1) { - feedList.feeds[feedIndex].title = title; - feedList.feeds[feedIndex].description = description; - feedList.feeds[feedIndex].expires_at = expires_at; - await this.kv.put(FEEDS_LIST_KEY, JSON.stringify(feedList)); - } - } catch (error) { - logger.error("Error updating feed in list", { - feedId: feedId.value, + logger.error("Error upserting feed in list", { + feedId: summary.id, error: String(error), }); } diff --git a/src/infrastructure/websub-subscription-repository.test.ts b/src/infrastructure/websub-subscription-repository.test.ts index 2691c5d..4d100c6 100644 --- a/src/infrastructure/websub-subscription-repository.test.ts +++ b/src/infrastructure/websub-subscription-repository.test.ts @@ -1,19 +1,21 @@ import { describe, it, expect } from "vitest"; import { createMockEnv } from "../test/setup"; import { WebSubSubscriptionRepository } from "./websub-subscription-repository"; +import { FeedId } from "../domain/value-objects/feed-id"; import type { Env, WebSubSubscription } from "../types"; const mockEnv = () => createMockEnv() as unknown as Env; +const fid = FeedId.fromTrusted("a.b.42"); describe("WebSubSubscriptionRepository", () => { it("round-trips subscriptions and counts feeds with subscribers", async () => { const repo = new WebSubSubscriptionRepository(mockEnv().EMAIL_STORAGE); - expect(await repo.get("a.b.42")).toEqual([]); + expect(await repo.get(fid)).toEqual([]); const subs: WebSubSubscription[] = [ { callbackUrl: "https://r.example/cb", expiresAt: 9999 }, ]; - await repo.save("a.b.42", subs); - expect(await repo.get("a.b.42")).toEqual(subs); + await repo.save(fid, subs); + expect(await repo.get(fid)).toEqual(subs); expect(await repo.countKeys()).toBe(1); }); }); diff --git a/src/infrastructure/websub-subscription-repository.ts b/src/infrastructure/websub-subscription-repository.ts index 943413d..841b5b1 100644 --- a/src/infrastructure/websub-subscription-repository.ts +++ b/src/infrastructure/websub-subscription-repository.ts @@ -1,5 +1,6 @@ import { Env, WebSubSubscription } from "../types"; import { feedKeys } from "../domain/feed-keys"; +import { FeedId } from "../domain/value-objects/feed-id"; import { logger } from "./logger"; /** @@ -12,16 +13,19 @@ export class WebSubSubscriptionRepository { return new WebSubSubscriptionRepository(env.EMAIL_STORAGE); } - async get(feedId: string): Promise { - const raw = await this.kv.get(feedKeys.websub(feedId), "json"); + async get(feedId: FeedId): Promise { + const raw = await this.kv.get(feedKeys.websub(feedId.value), "json"); return (raw as WebSubSubscription[] | null) ?? []; } async save( - feedId: string, + feedId: FeedId, subscriptions: WebSubSubscription[], ): Promise { - await this.kv.put(feedKeys.websub(feedId), JSON.stringify(subscriptions)); + await this.kv.put( + feedKeys.websub(feedId.value), + JSON.stringify(subscriptions), + ); } /** Number of feeds that currently hold at least one WebSub subscription. */ diff --git a/src/infrastructure/websub.test.ts b/src/infrastructure/websub.test.ts index 7d3c929..1336536 100644 --- a/src/infrastructure/websub.test.ts +++ b/src/infrastructure/websub.test.ts @@ -9,9 +9,11 @@ import { verifyAndStoreSubscription, verifyAndDeleteSubscription, } from "./websub"; +import { FeedId } from "../domain/value-objects/feed-id"; import type { Env, WebSubSubscription } from "../types"; const mockEnv = () => createMockEnv() as unknown as Env; +const fid = (value: string) => FeedId.fromTrusted(value); describe("buildHmacSignature", () => { it("returns sha256= prefixed hex", async () => { @@ -35,7 +37,7 @@ describe("buildHmacSignature", () => { describe("getSubscriptions / saveSubscriptions", () => { it("returns empty array when no subs exist", async () => { const env = mockEnv(); - expect(await getSubscriptions("feed1", env)).toEqual([]); + expect(await getSubscriptions(fid("feed1"), env)).toEqual([]); }); it("round-trips stored subscriptions", async () => { @@ -46,13 +48,13 @@ describe("getSubscriptions / saveSubscriptions", () => { expiresAt: Date.now() + 60000, }, ]; - await saveSubscriptions("feed1", subs, env); - expect(await getSubscriptions("feed1", env)).toEqual(subs); + await saveSubscriptions(fid("feed1"), subs, env); + expect(await getSubscriptions(fid("feed1"), env)).toEqual(subs); }); it("uses the correct KV key", async () => { const env = mockEnv(); - await saveSubscriptions("abc", [], env); + await saveSubscriptions(fid("abc"), [], env); expect( await env.EMAIL_STORAGE.get("websub:subs:abc", { type: "json" }), ).toEqual([]); @@ -69,7 +71,7 @@ describe("notifySubscribers", () => { return HttpResponse.text("ok"); }), ); - await notifySubscribers("feed1", env); + await notifySubscribers(fid("feed1"), env); expect(called).toBe(false); }); @@ -81,7 +83,7 @@ describe("notifySubscribers", () => { expiresAt: Date.now() + 60000, }, ]; - await saveSubscriptions("feed1", subs, env); + await saveSubscriptions(fid("feed1"), subs, env); let called = false; server.use( http.post("https://reader.example/callback", () => { @@ -89,7 +91,7 @@ describe("notifySubscribers", () => { return HttpResponse.text("ok"); }), ); - await notifySubscribers("feed1", env); + await notifySubscribers(fid("feed1"), env); expect(called).toBe(false); }); @@ -113,7 +115,7 @@ describe("notifySubscribers", () => { expiresAt: Date.now() + 60000, }, ]; - await saveSubscriptions("feed1", subs, env); + await saveSubscriptions(fid("feed1"), subs, env); let receivedBody = ""; let receivedContentType = ""; @@ -125,7 +127,7 @@ describe("notifySubscribers", () => { }), ); - await notifySubscribers("feed1", env); + await notifySubscribers(fid("feed1"), env); expect(receivedBody).toContain(" { secret: "mysecret", }, ]; - await saveSubscriptions("feed1", subs, env); + await saveSubscriptions(fid("feed1"), subs, env); let receivedSig256 = ""; let receivedSig = ""; @@ -164,7 +166,7 @@ describe("notifySubscribers", () => { }), ); - await notifySubscribers("feed1", env); + await notifySubscribers(fid("feed1"), env); expect(receivedSig256).toMatch(/^sha256=[0-9a-f]{64}$/); expect(receivedSig).toBe(""); // legacy header should NOT be sent }); @@ -190,7 +192,7 @@ describe("notifySubscribers", () => { format: "atom", }, ]; - await saveSubscriptions("feed1", subs, env); + await saveSubscriptions(fid("feed1"), subs, env); let receivedContentType = ""; let receivedLink = ""; @@ -202,7 +204,7 @@ describe("notifySubscribers", () => { }), ); - await notifySubscribers("feed1", env); + await notifySubscribers(fid("feed1"), env); expect(receivedContentType).toContain("application/atom+xml"); expect(receivedLink).toContain(`/atom/feed1`); expect(receivedLink).toContain(`rel="self"`); @@ -234,7 +236,7 @@ describe("notifySubscribers", () => { format: "atom", }, ]; - await saveSubscriptions("feed1", subs, env); + await saveSubscriptions(fid("feed1"), subs, env); const received: Record = {}; server.use( @@ -248,7 +250,7 @@ describe("notifySubscribers", () => { }), ); - await notifySubscribers("feed1", env); + await notifySubscribers(fid("feed1"), env); expect(received.rss).toContain("application/rss+xml"); expect(received.atom).toContain("application/atom+xml"); }); @@ -277,7 +279,7 @@ describe("notifySubscribers", () => { expiresAt: Date.now() + 60000, }, ]; - await saveSubscriptions("feed1", subs, env); + await saveSubscriptions(fid("feed1"), subs, env); const notified: string[] = []; server.use( @@ -291,10 +293,10 @@ describe("notifySubscribers", () => { }), ); - await notifySubscribers("feed1", env); + await notifySubscribers(fid("feed1"), env); expect(notified).toEqual(["active"]); - const remaining = await getSubscriptions("feed1", env); + const remaining = await getSubscriptions(fid("feed1"), env); expect(remaining).toHaveLength(1); expect(remaining[0].callbackUrl).toBe("https://active.example/callback"); }); @@ -312,7 +314,7 @@ describe("verifyAndStoreSubscription", () => { ); const result = await verifyAndStoreSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", undefined, 86400, @@ -321,7 +323,7 @@ describe("verifyAndStoreSubscription", () => { ); expect(result).toBe(true); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs).toHaveLength(1); expect(subs[0].callbackUrl).toBe("https://reader.example/callback"); expect(subs[0].expiresAt).toBeGreaterThan(Date.now()); @@ -340,7 +342,7 @@ describe("verifyAndStoreSubscription", () => { ); const result = await verifyAndStoreSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", undefined, 86400, @@ -350,7 +352,7 @@ describe("verifyAndStoreSubscription", () => { expect(result).toBe(true); expect(receivedTopic).toContain("/atom/feed1"); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs[0].format).toBe("atom"); }); @@ -363,7 +365,7 @@ describe("verifyAndStoreSubscription", () => { ); const result = await verifyAndStoreSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", undefined, 86400, @@ -372,7 +374,7 @@ describe("verifyAndStoreSubscription", () => { ); expect(result).toBe(false); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs).toHaveLength(0); }); @@ -381,7 +383,7 @@ describe("verifyAndStoreSubscription", () => { const existing: WebSubSubscription[] = [ { callbackUrl: "https://reader.example/callback", expiresAt: 1000 }, ]; - await saveSubscriptions("feed1", existing, env); + await saveSubscriptions(fid("feed1"), existing, env); server.use( http.get("https://reader.example/callback", ({ request }) => { @@ -392,7 +394,7 @@ describe("verifyAndStoreSubscription", () => { ); const result = await verifyAndStoreSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", "newsecret", 3600, @@ -401,7 +403,7 @@ describe("verifyAndStoreSubscription", () => { ); expect(result).toBe(true); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs).toHaveLength(1); expect(subs[0].secret).toBe("newsecret"); }); @@ -413,7 +415,7 @@ describe("verifyAndStoreSubscription", () => { ); const result = await verifyAndStoreSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", undefined, 86400, @@ -422,7 +424,7 @@ describe("verifyAndStoreSubscription", () => { ); expect(result).toBe(false); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs).toHaveLength(0); }); @@ -437,7 +439,7 @@ describe("verifyAndStoreSubscription", () => { ); const result = await verifyAndStoreSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", undefined, 86400, @@ -446,7 +448,7 @@ describe("verifyAndStoreSubscription", () => { ); expect(result).toBe(false); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs).toHaveLength(0); }); }); @@ -455,7 +457,7 @@ describe("verifyAndDeleteSubscription", () => { it("removes subscription and returns true when callback echoes challenge", async () => { const env = mockEnv(); await saveSubscriptions( - "feed1", + fid("feed1"), [ { callbackUrl: "https://reader.example/callback", @@ -474,19 +476,19 @@ describe("verifyAndDeleteSubscription", () => { ); const result = await verifyAndDeleteSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", env, ); expect(result).toBe(true); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs).toHaveLength(0); }); it("returns false and leaves subscription intact when callback returns wrong challenge", async () => { const env = mockEnv(); await saveSubscriptions( - "feed1", + fid("feed1"), [ { callbackUrl: "https://reader.example/callback", @@ -503,19 +505,19 @@ describe("verifyAndDeleteSubscription", () => { ); const result = await verifyAndDeleteSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", env, ); expect(result).toBe(false); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs).toHaveLength(1); }); it("returns false and leaves subscription intact when callback fetch fails", async () => { const env = mockEnv(); await saveSubscriptions( - "feed1", + fid("feed1"), [ { callbackUrl: "https://reader.example/callback", @@ -530,12 +532,12 @@ describe("verifyAndDeleteSubscription", () => { ); const result = await verifyAndDeleteSubscription( - "feed1", + fid("feed1"), "https://reader.example/callback", env, ); expect(result).toBe(false); - const subs = await getSubscriptions("feed1", env); + const subs = await getSubscriptions(fid("feed1"), env); expect(subs).toHaveLength(1); }); }); diff --git a/src/infrastructure/websub.ts b/src/infrastructure/websub.ts index 776aaa7..e8074de 100644 --- a/src/infrastructure/websub.ts +++ b/src/infrastructure/websub.ts @@ -6,14 +6,14 @@ import { WebSubSubscriptionRepository } from "./websub-subscription-repository"; import { FeedId } from "../domain/value-objects/feed-id"; export async function getSubscriptions( - feedId: string, + feedId: FeedId, env: Env, ): Promise { return WebSubSubscriptionRepository.from(env).get(feedId); } export async function saveSubscriptions( - feedId: string, + feedId: FeedId, subscriptions: WebSubSubscription[], env: Env, ): Promise { @@ -43,22 +43,21 @@ export async function buildHmacSignature( } async function buildFeedXml( - feedId: string, + feedId: FeedId, env: Env, format: "rss" | "atom" = "rss", ): Promise { const repo = FeedRepository.from(env); - const id = FeedId.fromTrusted(feedId); const [feedMetadata, rawConfig] = await Promise.all([ - repo.getMetadata(id), - repo.getConfig(id), + repo.getMetadata(feedId), + repo.getConfig(feedId), ]); if (!feedMetadata) return null; const base = baseUrl(env); const feedConfig: FeedConfig = rawConfig ?? { - title: `Newsletter Feed ${feedId}`, + title: `Newsletter Feed ${feedId.value}`, description: "Converted email newsletter", language: "en", created_at: Date.now(), @@ -74,15 +73,15 @@ async function buildFeedXml( feedConfig, emailsData, base, - feedId, - feedAtomUrl(feedId, env), + feedId.value, + feedAtomUrl(feedId.value, env), ); } - return generateRssFeed(feedConfig, emailsData, base, feedId); + return generateRssFeed(feedConfig, emailsData, base, feedId.value); } export async function notifySubscribers( - feedId: string, + feedId: FeedId, env: Env, ): Promise { const subs = await getSubscriptions(feedId, env); @@ -140,12 +139,17 @@ export async function notifySubscribers( await Promise.allSettled([ ...(rssFeed ? rssSubs.map((sub) => - deliver(sub, rssFeed, "application/rss+xml", `/rss/${feedId}`), + deliver(sub, rssFeed, "application/rss+xml", `/rss/${feedId.value}`), ) : []), ...(atomFeed ? atomSubs.map((sub) => - deliver(sub, atomFeed, "application/atom+xml", `/atom/${feedId}`), + deliver( + sub, + atomFeed, + "application/atom+xml", + `/atom/${feedId.value}`, + ), ) : []), ]); @@ -176,7 +180,7 @@ async function verifyCallback( } export async function verifyAndStoreSubscription( - feedId: string, + feedId: FeedId, callbackUrl: string, secret: string | undefined, leaseSeconds: number, @@ -185,7 +189,7 @@ export async function verifyAndStoreSubscription( ): Promise { const verified = await verifyCallback(callbackUrl, { "hub.mode": "subscribe", - "hub.topic": feedUrl(format, feedId, env), + "hub.topic": feedUrl(format, feedId.value, env), "hub.lease_seconds": String(leaseSeconds), }); if (!verified) return false; @@ -208,13 +212,13 @@ export async function verifyAndStoreSubscription( } export async function verifyAndDeleteSubscription( - feedId: string, + feedId: FeedId, callbackUrl: string, env: Env, ): Promise { const verified = await verifyCallback(callbackUrl, { "hub.mode": "unsubscribe", - "hub.topic": feedRssUrl(feedId, env), + "hub.topic": feedRssUrl(feedId.value, env), }); if (!verified) return false; diff --git a/src/routes/admin.tsx b/src/routes/admin.tsx index 8c545fe..d3c6f8f 100644 --- a/src/routes/admin.tsx +++ b/src/routes/admin.tsx @@ -9,6 +9,7 @@ import { logger } from "../infrastructure/logger"; import { timingSafeEqual, checkProxyAuth } from "../infrastructure/auth"; import { Layout, clampText } from "./admin/ui"; import { FeedRepository } from "../infrastructure/feed-repository"; +import { FeedId } from "../domain/value-objects/feed-id"; import { editFeedDetails } from "../application/feed-service"; import { feedRssUrl, @@ -996,12 +997,21 @@ app.post( try { const { title, description } = c.req.valid("json"); - // In-place edit: only title/description, expiry untouched. - const result = await editFeedDetails(env, feedId, { title, description }); + // Quick-edit: only title/description, expiry untouched. + const result = await editFeedDetails(env, FeedId.fromTrusted(feedId), { + title, + description, + }); if (result.status === "not_found") { return c.json({ error: "Feed not found" }, 404); } + if (result.status === "expired") { + return c.json( + { error: "Feed has expired and cannot be modified." }, + 403, + ); + } return c.json({ success: true }); } catch (error) { diff --git a/src/routes/admin/emails.tsx b/src/routes/admin/emails.tsx index c1eedf2..e532063 100644 --- a/src/routes/admin/emails.tsx +++ b/src/routes/admin/emails.tsx @@ -699,7 +699,7 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => { : c.text("Feed not found", 404); } - const allowedKeys = new Set(feed.metadata.emails.map((email) => email.key)); + const allowedKeys = new Set(feed.emails.map((email) => email.key)); if (wantsJson) { const body = (await c.req.json().catch(() => null)) as { @@ -729,7 +729,7 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => { const { ok: deletedOk, failed: failedEmailKeys } = await deleteKeysWithConcurrency(emailStorage, candidates, 35); - await deleteAttachmentsForEmails(env, feed.metadata.emails, candidates); + await deleteAttachmentsForEmails(env, feed.emails, candidates); feed.removeEmails(deletedOk); await repo.saveMetadata(feed); @@ -757,7 +757,7 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => { candidates, 35, ); - await deleteAttachmentsForEmails(env, feed.metadata.emails, candidates); + await deleteAttachmentsForEmails(env, feed.emails, candidates); feed.removeEmails(deletedOk); await repo.saveMetadata(feed); diff --git a/src/routes/admin/feeds.tsx b/src/routes/admin/feeds.tsx index f5e67b2..edcaaf3 100644 --- a/src/routes/admin/feeds.tsx +++ b/src/routes/admin/feeds.tsx @@ -335,7 +335,7 @@ feedsRouter.post("/:feedId/edit", async (c) => { blockedSenders, }); - const result = await editFeed(env, feedId, { + const result = await editFeed(env, FeedId.fromTrusted(feedId), { title: parsedData.title, description: parsedData.description, language: parsedData.language, @@ -422,7 +422,9 @@ feedsRouter.post("/:feedId/delete", async (c) => { const wantsJson = (c.req.header("Accept") || "").includes("application/json"); try { - await deleteFeedRecord(env, feedId, (p) => waitUntilSafe(c, p)); + await deleteFeedRecord(env, FeedId.fromTrusted(feedId), (p) => + waitUntilSafe(c, p), + ); if (wantsJson) { return c.json({ ok: true, feedId }); @@ -456,11 +458,15 @@ feedsRouter.post("/:feedId/purge", async (c) => { ? Number(body?.limit) : 100; - const step = await purgeFeedKeysStep(emailStorage, feedId, { - cursor, - limit, - bucket: getAttachmentBucket(env), - }); + const step = await purgeFeedKeysStep( + emailStorage, + FeedId.fromTrusted(feedId), + { + cursor, + limit, + bucket: getAttachmentBucket(env), + }, + ); return c.json({ ok: step.failedKeys.length === 0, @@ -516,9 +522,10 @@ feedsRouter.post("/bulk-delete", async (c) => { for (const feedId of parsedFeedIds) { try { + const id = FeedId.fromTrusted(feedId); // Read unsubscribe URLs before the feed metadata is deleted. - const urls = await collectUnsubscribeUrls(emailStorage, feedId); - const result = await deleteFeedFastDetailed(emailStorage, feedId); + const urls = await collectUnsubscribeUrls(emailStorage, id); + const result = await deleteFeedFastDetailed(emailStorage, id); if (!result.ok) { failures.push({ feedId, @@ -599,9 +606,10 @@ feedsRouter.post("/bulk-delete", async (c) => { for (const feedId of parsedFeedIds) { try { + const id = FeedId.fromTrusted(feedId); // Read unsubscribe URLs before the feed metadata is deleted. - const urls = await collectUnsubscribeUrls(emailStorage, feedId); - const result = await deleteFeedFastDetailed(emailStorage, feedId); + const urls = await collectUnsubscribeUrls(emailStorage, id); + const result = await deleteFeedFastDetailed(emailStorage, id); if (result.ok) { unsubscribeUrls.push(...urls); okIds.push(feedId); diff --git a/src/routes/api/index.ts b/src/routes/api/index.ts index 5b38b6a..91d53e5 100644 --- a/src/routes/api/index.ts +++ b/src/routes/api/index.ts @@ -209,8 +209,9 @@ apiApp.openapi( async (c) => { const env = c.env; const { feedId } = c.req.valid("param"); + const id = FeedId.fromTrusted(feedId); const body = c.req.valid("json"); - const result = await editFeed(env, feedId, { + const result = await editFeed(env, id, { title: body.title, description: body.description, language: body.language, @@ -222,9 +223,7 @@ apiApp.openapi( return c.json({ error: "Feed not found" }, 404); if (result.status === "expired") return c.json({ error: "Feed has expired and cannot be modified" }, 409); - const metadata = await FeedRepository.from(env).getMetadata( - FeedId.fromTrusted(feedId), - ); + const metadata = await FeedRepository.from(env).getMetadata(id); return c.json( toFeed(feedId, result.config, metadata?.emails.length ?? 0, env), 200, @@ -249,8 +248,10 @@ apiApp.openapi( async (c) => { const env = c.env; const { feedId } = c.req.valid("param"); - const removed = await deleteFeedRecord(env, feedId, (p) => - waitUntilSafe(c, p), + const removed = await deleteFeedRecord( + env, + FeedId.fromTrusted(feedId), + (p) => waitUntilSafe(c, p), ); if (!removed) return c.json({ error: "Feed not found" }, 404); return c.json({ ok: true }, 200); @@ -359,9 +360,7 @@ apiApp.openapi( const { feedId, entryId } = c.req.valid("param"); const receivedAt = parseInt(entryId, 10); const feed = await repo.load(FeedId.fromTrusted(feedId)); - const metaEntry = feed?.metadata.emails.find( - (e) => e.receivedAt === receivedAt, - ); + const metaEntry = feed?.emails.find((e) => e.receivedAt === receivedAt); if (!feed || !metaEntry) return c.json({ error: "Email not found" }, 404); await repo.deleteEmail(metaEntry.key); diff --git a/src/routes/atom.ts b/src/routes/atom.ts index 62e680b..f069a59 100644 --- a/src/routes/atom.ts +++ b/src/routes/atom.ts @@ -4,6 +4,7 @@ import { generateAtomFeed } from "../infrastructure/feed-generator"; import { fetchFeedData } from "../application/feed-fetcher"; import { baseUrl, feedAtomUrl } from "../infrastructure/urls"; import { isExpired } from "../domain/feed"; +import { FeedId } from "../domain/value-objects/feed-id"; export async function handle(c: Context<{ Bindings: Env }>): Promise { try { @@ -12,7 +13,7 @@ export async function handle(c: Context<{ Bindings: Env }>): Promise { return new Response("Feed ID is required", { status: 400 }); } - const feedData = await fetchFeedData(feedId, c.env); + const feedData = await fetchFeedData(FeedId.fromTrusted(feedId), c.env); if (!feedData) { return new Response("Feed not found", { status: 404 }); } diff --git a/src/routes/hub.ts b/src/routes/hub.ts index 0b67c75..b142e3b 100644 --- a/src/routes/hub.ts +++ b/src/routes/hub.ts @@ -72,12 +72,10 @@ hubRouter.post("/", async (c) => { ); } const format = match[1] as "rss" | "atom"; - const feedId = match[2]; + const feedId = FeedId.fromTrusted(match[2]); // Verify the feed exists before accepting any subscription - const feedConfig = await FeedRepository.from(env).getConfig( - FeedId.fromTrusted(feedId), - ); + const feedConfig = await FeedRepository.from(env).getConfig(feedId); if (!feedConfig) { return c.text("Not Found: feed does not exist", 404); } diff --git a/src/routes/rss.ts b/src/routes/rss.ts index d39eeb3..00e61a6 100644 --- a/src/routes/rss.ts +++ b/src/routes/rss.ts @@ -4,6 +4,7 @@ import { generateRssFeed } from "../infrastructure/feed-generator"; import { fetchFeedData } from "../application/feed-fetcher"; import { baseUrl, feedRssUrl } from "../infrastructure/urls"; import { isExpired } from "../domain/feed"; +import { FeedId } from "../domain/value-objects/feed-id"; export async function handle(c: Context<{ Bindings: Env }>): Promise { try { @@ -12,7 +13,7 @@ export async function handle(c: Context<{ Bindings: Env }>): Promise { return new Response("Feed ID is required", { status: 400 }); } - const feedData = await fetchFeedData(feedId, c.env); + const feedData = await fetchFeedData(FeedId.fromTrusted(feedId), c.env); if (!feedData) { return new Response("Feed not found", { status: 404 }); }