From c45f6677fecffca221fab6032ba814b7991dde4b Mon Sep 17 00:00:00 2001 From: Julien Herr Date: Sun, 24 May 2026 00:33:14 +0200 Subject: [PATCH] refactor(domain): introduce the Feed aggregate as the write-path API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a Feed aggregate class owning config + the email index, with create, ingest, removeEmails, isExpired and accepts delegating to the existing pure invariant functions. FeedRepository gains load/save/saveMetadata that reconstitute and persist the aggregate. All write paths now go through it: createFeedRecord (Feed.create), email ingestion (feed.ingest), and every email deletion in the admin UI and REST API (feed.removeEmails) — no route mutates metadata.emails directly anymore. KV key strings unchanged. Co-Authored-By: Claude Opus 4.7 --- src/domain/feed-repository.ts | 32 ++++++ src/domain/feed.aggregate.test.ts | 157 ++++++++++++++++++++++++++++++ src/domain/feed.aggregate.ts | 135 +++++++++++++++++++++++++ src/lib/email-processor.ts | 108 ++++++++++---------- src/lib/feed-service.ts | 56 +++-------- src/routes/admin.test.ts | 4 + src/routes/admin/emails.tsx | 39 +++----- src/routes/api/index.ts | 15 +-- 8 files changed, 415 insertions(+), 131 deletions(-) create mode 100644 src/domain/feed.aggregate.test.ts create mode 100644 src/domain/feed.aggregate.ts diff --git a/src/domain/feed-repository.ts b/src/domain/feed-repository.ts index d5dcc92..d0ea121 100644 --- a/src/domain/feed-repository.ts +++ b/src/domain/feed-repository.ts @@ -8,6 +8,7 @@ import { } from "../types"; import { FEEDS_LIST_KEY } from "../config/constants"; import { feedKeys } from "./feed-keys"; +import { Feed } from "./feed.aggregate"; import { logger } from "../lib/logger"; /** @@ -55,6 +56,37 @@ export class FeedRepository { return feedKeys.feedIdFromEmail(key); } + // ── Feed aggregate ──────────────────────────────────────────────────────── + + /** + * Load the aggregate (config + email index). A feed exists iff it has a + * config; metadata defaults to empty so a freshly-created feed still loads. + */ + async load(feedId: string): Promise { + const [config, metadata] = await Promise.all([ + this.getConfig(feedId), + this.getMetadata(feedId), + ]); + if (!config) return null; + return Feed.reconstitute(feedId, config, metadata ?? { emails: [] }); + } + + /** Persist both keys the aggregate owns (config + metadata). */ + async save(feed: Feed): Promise { + await Promise.all([ + this.putConfig(feed.id, feed.config), + this.putMetadata(feed.id, feed.metadata), + ]); + } + + /** + * Persist only the email index. Used by the ingest/delete paths where config + * is unchanged — avoids a redundant config write on the hot path. + */ + async saveMetadata(feed: Feed): Promise { + await this.putMetadata(feed.id, feed.metadata); + } + // ── Feed config ─────────────────────────────────────────────────────────── async getConfig(feedId: string): Promise { diff --git a/src/domain/feed.aggregate.test.ts b/src/domain/feed.aggregate.test.ts new file mode 100644 index 0000000..5bbdaf1 --- /dev/null +++ b/src/domain/feed.aggregate.test.ts @@ -0,0 +1,157 @@ +import { describe, it, expect } from "vitest"; +import { createMockEnv } from "../test/setup"; +import { Feed, CreateFeedInput } from "./feed.aggregate"; +import { FeedRepository } from "./feed-repository"; +import type { Env, EmailMetadata } from "../types"; + +const mockEnv = (overrides: Partial = {}) => + ({ ...createMockEnv(), ...overrides }) as unknown as Env; + +const createInput = ( + overrides: Partial = {}, +): CreateFeedInput => ({ + title: "News", + language: "en", + allowedSenders: [], + blockedSenders: [], + ...overrides, +}); + +const entry = (overrides: Partial = {}): EmailMetadata => ({ + key: "feed:a.b.42:1", + subject: "Hello", + receivedAt: 1, + size: 10, + ...overrides, +}); + +describe("Feed.create", () => { + it("builds a config with an empty email index and no expiry by default", () => { + const feed = Feed.create("a.b.42", createInput(), mockEnv()); + expect(feed.id).toBe("a.b.42"); + expect(feed.config.title).toBe("News"); + expect(feed.config.expires_at).toBeUndefined(); + expect(feed.metadata.emails).toEqual([]); + }); + + it("resolves expiry from lifetimeHours", () => { + const feed = Feed.create( + "a.b.42", + createInput({ lifetimeHours: 1 }), + mockEnv(), + ); + expect(feed.config.expires_at).toBeGreaterThan(Date.now()); + }); + + it("lets FEED_TTL_HOURS override a client lifetime", () => { + const feed = Feed.create( + "a.b.42", + createInput({ lifetimeHours: 1000000 }), + mockEnv({ FEED_TTL_HOURS: "1" }), + ); + const oneClientHour = Date.now() + 1000000 * 3_600_000; + expect(feed.config.expires_at).toBeLessThan(oneClientHour); + }); +}); + +describe("Feed.isExpired / accepts", () => { + it("reports expiry against the configured instant", () => { + const feed = Feed.reconstitute( + "a.b.42", + { title: "T", language: "en", created_at: 0, expires_at: 100 }, + { emails: [] }, + ); + expect(feed.isExpired(50)).toBe(false); + expect(feed.isExpired(150)).toBe(true); + }); + + it("applies the sender policy", () => { + const feed = Feed.reconstitute( + "a.b.42", + { + title: "T", + language: "en", + created_at: 0, + allowed_senders: ["good@example.com"], + }, + { emails: [] }, + ); + expect(feed.accepts(["good@example.com"])).toBe("accepted"); + expect(feed.accepts(["bad@example.com"])).toBe("blocked"); + }); +}); + +describe("Feed.ingest", () => { + it("prepends the entry, tracks icon/unsub and trims to the byte budget", () => { + const feed = Feed.reconstitute( + "a.b.42", + { title: "T", language: "en", created_at: 0 }, + { + emails: [entry({ key: "old", size: 400 })], + }, + ); + + const { dropped } = feed.ingest(entry({ key: "new", size: 400 }), { + maxBytes: 500, + iconDomain: "example.com", + unsub: { senderKey: "news@example.com", url: "https://u/1" }, + }); + + expect(feed.metadata.emails[0].key).toBe("new"); + expect(feed.metadata.iconDomain).toBe("example.com"); + expect(feed.metadata.unsubscribe).toEqual({ + "news@example.com": "https://u/1", + }); + expect(dropped.map((e) => e.key)).toEqual(["old"]); + expect(feed.metadata.emails.map((e) => e.key)).toEqual(["new"]); + }); +}); + +describe("Feed.removeEmails", () => { + it("drops matching keys and returns the removed entries", () => { + const feed = Feed.reconstitute( + "a.b.42", + { title: "T", language: "en", created_at: 0 }, + { + emails: [ + entry({ key: "k1" }), + entry({ key: "k2" }), + entry({ key: "k3" }), + ], + }, + ); + + const { removed } = feed.removeEmails(["k1", "k3", "missing"]); + expect(removed.map((e) => e.key).sort()).toEqual(["k1", "k3"]); + expect(feed.metadata.emails.map((e) => e.key)).toEqual(["k2"]); + }); +}); + +describe("FeedRepository.load / save round-trip", () => { + it("persists a created feed and reflects later mutations", async () => { + const repo = new FeedRepository(mockEnv().EMAIL_STORAGE); + const created = Feed.create( + "a.b.42", + createInput({ title: "Round" }), + mockEnv(), + ); + await repo.save(created); + + const loaded = await repo.load("a.b.42"); + expect(loaded).not.toBeNull(); + expect(loaded!.config.title).toBe("Round"); + + loaded!.ingest(entry({ key: "feed:a.b.42:1" }), { maxBytes: 1_000_000 }); + await repo.saveMetadata(loaded!); + + const reloaded = await repo.load("a.b.42"); + expect(reloaded!.metadata.emails.map((e) => e.key)).toEqual([ + "feed:a.b.42:1", + ]); + }); + + it("returns null when the feed has no config", async () => { + const repo = new FeedRepository(mockEnv().EMAIL_STORAGE); + expect(await repo.load("missing")).toBeNull(); + }); +}); diff --git a/src/domain/feed.aggregate.ts b/src/domain/feed.aggregate.ts new file mode 100644 index 0000000..d75cf88 --- /dev/null +++ b/src/domain/feed.aggregate.ts @@ -0,0 +1,135 @@ +import { Env, FeedConfig, FeedMetadata, EmailMetadata } from "../types"; +import { + resolveExpiresAt, + isExpired, + applySenderPolicy, + trimToByteBudget, + SenderDecision, +} from "./feed"; + +export interface CreateFeedInput { + title: string; + description?: string; + language: string; + allowedSenders: string[]; + blockedSenders: string[]; + lifetimeHours?: number; +} + +export interface UpdateFeedInput { + title?: string; + description?: string; + language?: string; + allowedSenders?: string[]; + blockedSenders?: string[]; + lifetimeHours?: number; +} + +export interface IngestOptions { + maxBytes: number; + iconDomain?: string; + /** RFC 8058 one-click unsubscribe link, keyed by the sending newsletter. */ + unsub?: { senderKey: string; url: string }; +} + +/** + * The Feed aggregate: the consistency boundary around a feed's config and the + * metadata index of its emails. All mutations to either go through a method + * here so the invariants (expiry policy, sender policy, byte budget) live in one + * place. Email bodies are large blobs referenced by `metadata.emails[].key` and + * deliberately sit *outside* the aggregate — the caller flushes them alongside + * `FeedRepository.save`/`saveMetadata`. + * + * I/O-free: load and persist state through `FeedRepository`. KV has no multi-key + * transaction, so a future Durable Object keyed by feed id would wrap + * load→mutate→save to serialise concurrent writers (see email-processor.ts). + */ +export class Feed { + private constructor( + readonly id: string, + private _config: FeedConfig, + private _metadata: FeedMetadata, + ) {} + + /** Mint a brand-new feed with an empty email index. */ + static create(id: string, input: CreateFeedInput, env: Env): Feed { + const now = Date.now(); + const expiresAt = resolveExpiresAt(env, input.lifetimeHours); + const config: FeedConfig = { + title: input.title, + description: input.description, + language: input.language, + allowed_senders: input.allowedSenders, + blocked_senders: input.blockedSenders, + created_at: now, + updated_at: now, + ...(expiresAt !== undefined ? { expires_at: expiresAt } : {}), + }; + return new Feed(id, config, { emails: [] }); + } + + /** Rebuild an aggregate from persisted state. */ + static reconstitute( + id: string, + config: FeedConfig, + metadata: FeedMetadata, + ): Feed { + return new Feed(id, config, metadata); + } + + get config(): Readonly { + return this._config; + } + + get metadata(): Readonly { + return this._metadata; + } + + isExpired(now: number = Date.now()): boolean { + return isExpired(this._config, now); + } + + accepts(senders: string[]): SenderDecision { + return applySenderPolicy(this._config, senders); + } + + /** + * Add an email to the front of the index, refresh the icon domain and the + * per-sender unsubscribe link, then trim the oldest entries back under the + * byte budget. Returns the dropped entries so the caller can purge their + * bodies/attachments. + */ + ingest( + entry: EmailMetadata, + opts: IngestOptions, + ): { dropped: EmailMetadata[] } { + this._metadata.emails.unshift(entry); + + if (opts.iconDomain) { + this._metadata.iconDomain = opts.iconDomain; + } + if (opts.unsub) { + this._metadata.unsubscribe = { + ...(this._metadata.unsubscribe ?? {}), + [opts.unsub.senderKey]: opts.unsub.url, + }; + } + + return trimToByteBudget(this._metadata, opts.maxBytes); + } + + /** + * Drop the given email keys from the index. Returns the removed entries so the + * caller can purge their bodies/attachments. + */ + removeEmails(keys: string[]): { removed: EmailMetadata[] } { + const target = new Set(keys); + const removed: EmailMetadata[] = []; + const kept: EmailMetadata[] = []; + for (const entry of this._metadata.emails) { + (target.has(entry.key) ? removed : kept).push(entry); + } + this._metadata.emails = kept; + return { removed }; + } +} diff --git a/src/lib/email-processor.ts b/src/lib/email-processor.ts index c575a37..536e07c 100644 --- a/src/lib/email-processor.ts +++ b/src/lib/email-processor.ts @@ -9,7 +9,7 @@ import { import { parseOneClickUnsubscribe } from "../utils/unsubscribe"; import { getAttachmentBucket } from "../utils/attachments"; import { FeedRepository } from "../domain/feed-repository"; -import { isExpired, applySenderPolicy, trimToByteBudget } from "../domain/feed"; +import { Feed } from "../domain/feed.aggregate"; import { logger } from "./logger"; import { FEED_MAX_BYTES } from "../config/constants"; @@ -70,10 +70,12 @@ async function uploadAttachments( ); } -export async function validateEmail( +async function loadAcceptingFeed( input: ProcessEmailInput, env: Env, -): Promise { +): Promise< + { ok: true; feed: Feed } | { ok: false; reason: IngestRejectionReason } +> { const feedId = EmailParser.extractFeedId(input.toAddress); if (!feedId) { logger.error("Invalid email address format", { @@ -82,31 +84,30 @@ export async function validateEmail( return { ok: false, reason: "invalid_address" }; } - const feedConfig = await FeedRepository.from(env).getConfig(feedId); - if (!feedConfig) { + const feed = await FeedRepository.from(env).load(feedId); + if (!feed) { logger.error("Feed not found", { feedId }); return { ok: false, reason: "feed_not_found" }; } - if (isExpired(feedConfig)) { + if (feed.isExpired()) { logger.warn("Rejected email: feed expired", { feedId }); return { ok: false, reason: "feed_expired" }; } - - if (applySenderPolicy(feedConfig, input.senders) === "blocked") { + if (feed.accepts(input.senders) === "blocked") { logger.warn("Rejected email: sender filter", { feedId, senders: input.senders, - allowedSenders: feedConfig.allowed_senders, - blockedSenders: feedConfig.blocked_senders, + allowedSenders: feed.config.allowed_senders, + blockedSenders: feed.config.blocked_senders, }); return { ok: false, reason: "sender_blocked" }; } - return { ok: true, feedId }; + return { ok: true, feed }; } -export async function storeEmail( - feedId: string, +async function storeEmail( + feed: Feed, input: ProcessEmailInput, env: Env, ctx?: ExecutionContext, @@ -127,26 +128,12 @@ export async function storeEmail( }; const repo = FeedRepository.from(env); - const emailKey = repo.newEmailKey(feedId); + const emailKey = repo.newEmailKey(feed.id); + await repo.putEmail(emailKey, emailData); - const [, rawMetadata] = await Promise.all([ - repo.putEmail(emailKey, emailData), - repo.getMetadata(feedId), - ]); - - // Note: KV has no atomic compare-and-swap. Concurrent invocations for the - // same feed can read stale metadata and produce orphaned KV entries or - // duplicate trim deletions. This is an accepted limitation given Cloudflare - // KV's eventual-consistency model. - // TODO: Migrate feed metadata writes to Cloudflare Durable Objects to serialise - // concurrent writes and eliminate this race condition. - const feedMetadata = rawMetadata || { emails: [] }; - - const maxBytes = - parseInt(env.FEED_MAX_SIZE_BYTES ?? "", 10) || FEED_MAX_BYTES; - - const serialised = JSON.stringify(emailData); - const serialisedSize = new TextEncoder().encode(serialised).byteLength; + const serialisedSize = new TextEncoder().encode( + JSON.stringify(emailData), + ).byteLength; const newEntry: EmailMetadata = { key: emailKey, subject: emailData.subject, @@ -156,45 +143,48 @@ export async function storeEmail( ? { attachmentIds: storedAttachments.map((a) => a.id) } : {}), }; - feedMetadata.emails.unshift(newEntry); - // Track the latest sender's domain so the feed icon follows the source. + // Track the latest sender's domain (feed icon) and capture the RFC 8058 + // one-click unsubscribe link, keyed by sender so each newsletter keeps its + // own latest URL (fired when the feed is deleted). const iconDomain = extractEmailDomain(input.from); - if (iconDomain) { - feedMetadata.iconDomain = iconDomain; - } - - // Capture the sender's RFC 8058 one-click unsubscribe link so we can stop the - // newsletter when the feed is deleted. Keyed by sender: each newsletter on the - // feed keeps its own entry, and a repeat send overwrites with the latest URL. const unsubUrl = parseOneClickUnsubscribe(input.headers ?? {}); - if (unsubUrl) { - const senderKey = - input.senders[0] || extractEmailDomain(input.from) || input.from; - feedMetadata.unsubscribe = { - ...(feedMetadata.unsubscribe ?? {}), - [senderKey]: unsubUrl, - }; - } + const unsub = unsubUrl + ? { + senderKey: input.senders[0] || iconDomain || input.from, + url: unsubUrl, + } + : undefined; - const { dropped: toDelete } = trimToByteBudget(feedMetadata, maxBytes); + const maxBytes = + parseInt(env.FEED_MAX_SIZE_BYTES ?? "", 10) || FEED_MAX_BYTES; + + const { dropped } = feed.ingest(newEntry, { + maxBytes, + iconDomain: iconDomain ?? undefined, + unsub, + }); const r2Deletions = - attachmentBucket && toDelete.length > 0 - ? toDelete + attachmentBucket && dropped.length > 0 + ? dropped .flatMap((e) => e.attachmentIds ?? []) .map((id) => attachmentBucket.delete(id)) : []; + // KV has no compare-and-swap: the load (in loadAcceptingFeed) and this write + // are not serialised, so concurrent ingests for one feed can lose updates. + // Accepted under KV's eventual-consistency model; the Feed aggregate is the + // seam a Durable Object would later wrap to serialise these writers. await Promise.all([ - repo.putMetadata(feedId, feedMetadata), - ...toDelete.map((e) => repo.deleteEmail(e.key)), + repo.saveMetadata(feed), + ...dropped.map((e) => repo.deleteEmail(e.key)), ...r2Deletions, ]); - logger.info("Email processed", { feedId }); + logger.info("Email processed", { feedId: feed.id }); if (ctx) { - ctx.waitUntil(notifySubscribers(feedId, env)); + ctx.waitUntil(notifySubscribers(feed.id, env)); if (iconDomain) { ctx.waitUntil(cacheFaviconForDomain(iconDomain, env)); } @@ -206,16 +196,16 @@ export async function processEmail( env: Env, ctx?: ExecutionContext, ): Promise { - const validation = await validateEmail(input, env); + const validation = await loadAcceptingFeed(input, env); if (!validation.ok) { await bumpCounters(env.EMAIL_STORAGE, { emails_rejected: 1 }); return validation; } - await storeEmail(validation.feedId, input, env, ctx); + await storeEmail(validation.feed, input, env, ctx); await bumpCounters(env.EMAIL_STORAGE, { emails_received: 1, last_email_at: new Date().toISOString(), }); - return validation; + return { ok: true, feedId: validation.feed.id }; } diff --git a/src/lib/feed-service.ts b/src/lib/feed-service.ts index 5cf3e6a..893eed7 100644 --- a/src/lib/feed-service.ts +++ b/src/lib/feed-service.ts @@ -1,5 +1,5 @@ import { Context } from "hono"; -import { Env, FeedConfig, FeedMetadata } from "../types"; +import { Env, FeedConfig } from "../types"; import { generateFeedId } from "../utils/id-generator"; import { bumpCounters } from "../utils/stats"; import { waitUntilSafe } from "../utils/worker"; @@ -7,19 +7,17 @@ import { sendUnsubscribes } from "../utils/unsubscribe"; import { getAttachmentBucket } from "../utils/attachments"; import { FeedRepository } from "../domain/feed-repository"; import { resolveExpiresAt, isExpired } from "../domain/feed"; +import { + Feed, + CreateFeedInput, + UpdateFeedInput, +} from "../domain/feed.aggregate"; import { purgeFeedKeysStep, collectUnsubscribeUrls, } from "../routes/admin/helpers"; -export interface CreateFeedInput { - title: string; - description?: string; - language: string; - allowedSenders: string[]; - blockedSenders: string[]; - lifetimeHours?: number; -} +export type { CreateFeedInput, UpdateFeedInput }; /** * Create a feed: write its config + empty metadata, register it in the global @@ -30,44 +28,22 @@ export async function createFeedRecord( input: CreateFeedInput, ): Promise<{ feedId: string; config: FeedConfig }> { const repo = FeedRepository.from(env); - const expiresAt = resolveExpiresAt(env, input.lifetimeHours); - const feedId = generateFeedId(); + const feed = Feed.create(generateFeedId(), input, env); - const config: FeedConfig = { - title: input.title, - description: input.description, - language: input.language, - allowed_senders: input.allowedSenders, - blocked_senders: input.blockedSenders, - created_at: Date.now(), - updated_at: Date.now(), - ...(expiresAt !== undefined ? { expires_at: expiresAt } : {}), - }; - - const metadata: FeedMetadata = { emails: [] }; - - await Promise.all([ - repo.putConfig(feedId, config), - repo.putMetadata(feedId, metadata), - ]); - - await repo.addToList(feedId, input.title, input.description, expiresAt); + await repo.save(feed); + await repo.addToList( + feed.id, + feed.config.title, + feed.config.description, + feed.config.expires_at, + ); await bumpCounters(env.EMAIL_STORAGE, { feeds_created: 1, last_feed_created_at: new Date().toISOString(), }); - return { feedId, config }; -} - -export interface UpdateFeedInput { - title?: string; - description?: string; - language?: string; - allowedSenders?: string[]; - blockedSenders?: string[]; - lifetimeHours?: number; + return { feedId: feed.id, config: feed.config }; } export type UpdateFeedResult = diff --git a/src/routes/admin.test.ts b/src/routes/admin.test.ts index 4cd54ad..789888a 100644 --- a/src/routes/admin.test.ts +++ b/src/routes/admin.test.ts @@ -815,6 +815,10 @@ describe("Admin Routes", () => { "feeds:list", JSON.stringify({ feeds: [{ id: feedId, title: "F" }] }), ); + await r2Env.EMAIL_STORAGE.put( + `feed:${feedId}:config`, + JSON.stringify({ title: "F", language: "en", created_at: 1 }), + ); const emailKey = `feed:${feedId}:1`; await r2Env.EMAIL_STORAGE.put( emailKey, diff --git a/src/routes/admin/emails.tsx b/src/routes/admin/emails.tsx index c30093d..9649770 100644 --- a/src/routes/admin/emails.tsx +++ b/src/routes/admin/emails.tsx @@ -650,18 +650,13 @@ emailsRouter.post("/emails/:emailKey/delete", async (c) => { return c.text("Feed ID is required", 400); } - const feedMetadata = await repo.getMetadata(feedId); + const feed = await repo.load(feedId); await repo.deleteEmail(emailKey); - await deleteAttachmentsForEmails(env, feedMetadata?.emails ?? [], [ - emailKey, - ]); - - if (feedMetadata) { - feedMetadata.emails = feedMetadata.emails.filter( - (email) => email.key !== emailKey, - ); - await repo.putMetadata(feedId, feedMetadata); + if (feed) { + const { removed } = feed.removeEmails([emailKey]); + await deleteAttachmentsForEmails(env, removed, [emailKey]); + await repo.saveMetadata(feed); } if (wantsJson) return c.json({ ok: true, emailKey, feedId }); @@ -690,15 +685,15 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => { (c.req.header("Accept") || "").includes("application/json"); try { - const feedMetadata = await repo.getMetadata(feedId); + const feed = await repo.load(feedId); - if (!feedMetadata) { + if (!feed) { return wantsJson ? c.json({ ok: false, error: "Feed not found" }, 404) : c.text("Feed not found", 404); } - const allowedKeys = new Set(feedMetadata.emails.map((email) => email.key)); + const allowedKeys = new Set(feed.metadata.emails.map((email) => email.key)); if (wantsJson) { const body = (await c.req.json().catch(() => null)) as { @@ -728,13 +723,10 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => { const { ok: deletedOk, failed: failedEmailKeys } = await deleteKeysWithConcurrency(emailStorage, candidates, 35); - await deleteAttachmentsForEmails(env, feedMetadata.emails, candidates); + await deleteAttachmentsForEmails(env, feed.metadata.emails, candidates); - const deletedSet = new Set(deletedOk); - feedMetadata.emails = feedMetadata.emails.filter( - (email) => !deletedSet.has(email.key), - ); - await repo.putMetadata(feedId, feedMetadata); + feed.removeEmails(deletedOk); + await repo.saveMetadata(feed); return c.json({ ok: failedEmailKeys.length === 0, @@ -759,13 +751,10 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => { candidates, 35, ); - await deleteAttachmentsForEmails(env, feedMetadata.emails, candidates); + await deleteAttachmentsForEmails(env, feed.metadata.emails, candidates); - const deletedSet = new Set(deletedOk); - feedMetadata.emails = feedMetadata.emails.filter( - (email) => !deletedSet.has(email.key), - ); - await repo.putMetadata(feedId, feedMetadata); + feed.removeEmails(deletedOk); + await repo.saveMetadata(feed); return c.redirect( `/admin/feeds/${feedId}/emails?message=bulkDeleted&count=${deletedOk.length}`, diff --git a/src/routes/api/index.ts b/src/routes/api/index.ts index a333faf..5c6c5b6 100644 --- a/src/routes/api/index.ts +++ b/src/routes/api/index.ts @@ -345,15 +345,16 @@ apiApp.openapi( const repo = FeedRepository.from(env); const { feedId, entryId } = c.req.valid("param"); const receivedAt = parseInt(entryId, 10); - const metadata = await repo.getMetadata(feedId); - const metaEntry = metadata?.emails.find((e) => e.receivedAt === receivedAt); - if (!metadata || !metaEntry) - return c.json({ error: "Email not found" }, 404); + const feed = await repo.load(feedId); + const metaEntry = feed?.metadata.emails.find( + (e) => e.receivedAt === receivedAt, + ); + if (!feed || !metaEntry) return c.json({ error: "Email not found" }, 404); await repo.deleteEmail(metaEntry.key); - await deleteAttachmentsForEmails(env, metadata.emails, [metaEntry.key]); - metadata.emails = metadata.emails.filter((e) => e.key !== metaEntry.key); - await repo.putMetadata(feedId, metadata); + const { removed } = feed.removeEmails([metaEntry.key]); + await deleteAttachmentsForEmails(env, removed, [metaEntry.key]); + await repo.saveMetadata(feed); return c.json({ ok: true }, 200); },