refactor: split src into domain / application / infrastructure layers

Replace the history-driven lib/ + utils/ split with DDD layers:
- domain/: aggregate, repositories, value objects, pure parsers/format
- application/: feed-service, email-processor, feed-fetcher, stats
- infrastructure/: logging, auth, KV/R2 adapters, HTTP, framework glue

Pure file relocation; imports updated mechanically. Behaviour unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Julien Herr
2026-05-24 00:46:56 +02:00
parent ab1c15e69a
commit 7bf0f71f86
45 changed files with 90 additions and 68 deletions
+667
View File
@@ -0,0 +1,667 @@
import { describe, it, expect, beforeEach } from "vitest";
import { http, HttpResponse } from "msw";
import { createMockEnv, MockR2, server } from "../test/setup";
import {
processEmail,
ProcessEmailInput,
RawAttachment,
} from "./email-processor";
import { getCounters } from "../application/stats";
const iconKey = (domain: string) => `icon:${domain}`;
const VALID_FEED_ID = "apple.mountain.42";
const VALID_TO = `${VALID_FEED_ID}@test.getmynews.app`;
function makeInput(
overrides: Partial<ProcessEmailInput> = {},
): ProcessEmailInput {
return {
toAddress: VALID_TO,
from: "Sender <sender@example.com>",
senders: ["sender@example.com"],
subject: "Test Subject",
content: "<p>Hello</p>",
receivedAt: 1700000000000,
...overrides,
};
}
describe("processEmail", () => {
let env: ReturnType<typeof createMockEnv>;
beforeEach(() => {
env = createMockEnv();
});
it("returns 400 when toAddress has no valid feedId", async () => {
const res = await processEmail(
makeInput({ toAddress: "invalid@domain.com" }),
env as any,
);
expect(res).toMatchObject({ ok: false, reason: "invalid_address" });
});
it("returns 404 when feed does not exist", async () => {
const res = await processEmail(makeInput(), env as any);
expect(res).toMatchObject({ ok: false, reason: "feed_not_found" });
});
it("returns 403 when sender is not in allowlist", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({ allowed_senders: ["allowed@example.com"] }),
);
const res = await processEmail(
makeInput({ senders: ["other@example.com"] }),
env as any,
);
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
});
it("returns 200 and stores email when sender is allowed by exact match", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({ allowed_senders: ["sender@example.com"] }),
);
const res = await processEmail(makeInput(), env as any);
expect(res.ok).toBe(true);
});
it("returns 200 and stores email when sender matches by domain", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({ allowed_senders: ["example.com"] }),
);
const res = await processEmail(
makeInput({ senders: ["anyone@example.com"] }),
env as any,
);
expect(res.ok).toBe(true);
});
it("returns 200 when no allowlist is set", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({ allowed_senders: [] }),
);
const res = await processEmail(makeInput(), env as any);
expect(res.ok).toBe(true);
});
it("returns 403 when sender is in blocklist by exact address", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({ blocked_senders: ["sender@example.com"] }),
);
const res = await processEmail(makeInput(), env as any);
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
});
it("returns 403 when sender is in blocklist by domain", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({ blocked_senders: ["example.com"] }),
);
const res = await processEmail(makeInput(), env as any);
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
});
it("returns 200 when sender is not in blocklist", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({ blocked_senders: ["other@example.com"] }),
);
const res = await processEmail(makeInput(), env as any);
expect(res.ok).toBe(true);
});
it("exact block takes precedence over domain allow", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({
allowed_senders: ["example.com"],
blocked_senders: ["sender@example.com"],
}),
);
const res = await processEmail(makeInput(), env as any);
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
});
it("exact allow overrides domain block (exception use case)", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({
allowed_senders: ["sender@example.com"],
blocked_senders: ["example.com"],
}),
);
const res = await processEmail(makeInput(), env as any);
expect(res.ok).toBe(true);
});
it("exact block takes precedence over exact allow", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({
allowed_senders: ["sender@example.com"],
blocked_senders: ["sender@example.com"],
}),
);
const res = await processEmail(makeInput(), env as any);
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
});
it("stores email data and updates metadata in KV", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
const input = makeInput({ subject: "My Subject", content: "<b>body</b>" });
await processEmail(input, env as any);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
expect(metadata.emails).toHaveLength(1);
expect(metadata.emails[0].subject).toBe("My Subject");
const emailData = await env.EMAIL_STORAGE.get(
metadata.emails[0].key,
"json",
);
expect(emailData.subject).toBe("My Subject");
expect(emailData.content).toBe("<b>body</b>");
expect(emailData.from).toBe("Sender <sender@example.com>");
});
it("prepends to existing metadata", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:metadata`,
JSON.stringify({
emails: [{ key: "old-key", subject: "Old", receivedAt: 1, size: 100 }],
}),
);
await processEmail(makeInput({ subject: "New" }), env as any);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
expect(metadata.emails).toHaveLength(2);
expect(metadata.emails[0].subject).toBe("New");
expect(metadata.emails[1].subject).toBe("Old");
});
it("trims oldest emails when total size exceeds FEED_MAX_SIZE_BYTES", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
const oldKey1 = `feed:${VALID_FEED_ID}:111`;
const oldKey2 = `feed:${VALID_FEED_ID}:222`;
const bigContent = "x".repeat(200);
const email1 = JSON.stringify({
subject: "Old1",
from: "a@b.com",
content: bigContent,
receivedAt: 111,
headers: {},
});
const email2 = JSON.stringify({
subject: "Old2",
from: "a@b.com",
content: bigContent,
receivedAt: 222,
headers: {},
});
await env.EMAIL_STORAGE.put(oldKey1, email1);
await env.EMAIL_STORAGE.put(oldKey2, email2);
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:metadata`,
JSON.stringify({
emails: [
{
key: oldKey2,
subject: "Old2",
receivedAt: 222,
size: email2.length,
},
{
key: oldKey1,
subject: "Old1",
receivedAt: 111,
size: email1.length,
},
],
}),
);
const tinyEnv = { ...env, FEED_MAX_SIZE_BYTES: "50" };
const res = await processEmail(
makeInput({ subject: "New" }),
tinyEnv as any,
);
expect(res.ok).toBe(true);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
expect(metadata.emails).toHaveLength(1);
expect(metadata.emails[0].subject).toBe("New");
const deleted1 = await env.EMAIL_STORAGE.get(oldKey1, "json");
const deleted2 = await env.EMAIL_STORAGE.get(oldKey2, "json");
expect(deleted1).toBeNull();
expect(deleted2).toBeNull();
});
it("keeps entries within size budget untouched", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
const bigEnv = { ...env, FEED_MAX_SIZE_BYTES: String(10 * 1024 * 1024) };
await processEmail(makeInput({ subject: "First" }), bigEnv as any);
await processEmail(makeInput({ subject: "Second" }), bigEnv as any);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
expect(metadata.emails).toHaveLength(2);
});
it("calls ctx.waitUntil with notifySubscribers when ctx is provided", async () => {
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({
title: "Test",
language: "en",
created_at: Date.now(),
}),
);
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:metadata`,
JSON.stringify({ emails: [] }),
);
let waitUntilCalled = false;
const ctx = {
waitUntil: (p: Promise<unknown>) => {
waitUntilCalled = true;
void p; // don't actually await it
},
passThroughOnException: () => {},
} as unknown as ExecutionContext;
const res = await processEmail(makeInput(), env as any, ctx);
expect(res.ok).toBe(true);
expect(waitUntilCalled).toBe(true);
});
it("does not call ctx.waitUntil on error paths (feed not found)", async () => {
let waitUntilCalled = false;
const ctx = {
waitUntil: (p: Promise<unknown>) => {
waitUntilCalled = true;
void p;
},
passThroughOnException: () => {},
} as unknown as ExecutionContext;
// Feed ID is valid format but config doesn't exist → 404
const res = await processEmail(
makeInput({ toAddress: `no.such.99@test.getmynews.app` }),
env as any,
ctx,
);
expect(res).toMatchObject({ ok: false, reason: "feed_not_found" });
expect(waitUntilCalled).toBe(false);
});
});
describe("processEmail — attachments", () => {
const pdfContent = new TextEncoder().encode("PDF bytes")
.buffer as ArrayBuffer;
const pdfAttachment: RawAttachment = {
filename: "report.pdf",
contentType: "application/pdf",
content: pdfContent,
};
it("skips R2 upload when ATTACHMENT_BUCKET is not configured", async () => {
const env = createMockEnv();
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
const res = await processEmail(
makeInput({ attachments: [pdfAttachment] }),
env as any,
);
expect(res.ok).toBe(true);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
const emailData = await env.EMAIL_STORAGE.get(
metadata.emails[0].key,
"json",
);
expect(emailData.attachments).toBeUndefined();
});
it("skips R2 upload when ATTACHMENTS_ENABLED is 'false' even with R2 bound", async () => {
const env = createMockEnv({ withR2: true });
(env as any).ATTACHMENTS_ENABLED = "false";
const mockR2 = (env as any).ATTACHMENT_BUCKET as unknown as MockR2;
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
const res = await processEmail(
makeInput({ attachments: [pdfAttachment] }),
env as any,
);
expect(res.ok).toBe(true);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
const emailData = await env.EMAIL_STORAGE.get(
metadata.emails[0].key,
"json",
);
expect(emailData.attachments).toBeUndefined();
expect((await mockR2.list()).objects).toHaveLength(0);
});
it("uploads attachments to R2 and stores AttachmentData in emailData", async () => {
const env = createMockEnv({ withR2: true });
const mockR2 = (env as any).ATTACHMENT_BUCKET as unknown as MockR2;
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
const res = await processEmail(
makeInput({ attachments: [pdfAttachment] }),
env as any,
);
expect(res.ok).toBe(true);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
const emailData = await env.EMAIL_STORAGE.get(
metadata.emails[0].key,
"json",
);
expect(emailData.attachments).toHaveLength(1);
expect(emailData.attachments[0].filename).toBe("report.pdf");
expect(emailData.attachments[0].contentType).toBe("application/pdf");
expect(emailData.attachments[0].size).toBe(pdfContent.byteLength);
const id = emailData.attachments[0].id;
expect(mockR2._has(id)).toBe(true);
});
it("stores attachmentIds in EmailMetadata for trim-time cleanup", async () => {
const env = createMockEnv({ withR2: true });
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
await processEmail(makeInput({ attachments: [pdfAttachment] }), env as any);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
expect(metadata.emails[0].attachmentIds).toHaveLength(1);
expect(typeof metadata.emails[0].attachmentIds[0]).toBe("string");
});
it("deletes R2 objects when a trimmed email had attachments", async () => {
const env = createMockEnv({ withR2: true });
const mockR2 = (env as any).ATTACHMENT_BUCKET as unknown as MockR2;
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
// Store an old email with attachment in KV and metadata
const oldKey = `feed:${VALID_FEED_ID}:111`;
const oldAttachmentId = "old-attachment-uuid";
const bigContent = "x".repeat(200);
const oldEmail = JSON.stringify({
subject: "Old",
from: "a@b.com",
content: bigContent,
receivedAt: 111,
headers: {},
attachments: [
{
id: oldAttachmentId,
filename: "old.pdf",
contentType: "application/pdf",
size: 100,
},
],
});
await env.EMAIL_STORAGE.put(oldKey, oldEmail);
// Also put the attachment in mock R2
await mockR2.put(oldAttachmentId, new ArrayBuffer(100));
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:metadata`,
JSON.stringify({
emails: [
{
key: oldKey,
subject: "Old",
receivedAt: 111,
size: oldEmail.length,
attachmentIds: [oldAttachmentId],
},
],
}),
);
// Process with tight size budget to force trimming
const tinyEnv = { ...env, FEED_MAX_SIZE_BYTES: "50" };
const res = await processEmail(
makeInput({ subject: "New" }),
tinyEnv as any,
);
expect(res.ok).toBe(true);
// Old attachment should be deleted from R2
expect(mockR2._has(oldAttachmentId)).toBe(false);
});
});
describe("processEmail — monitoring counters", () => {
it("increments emails_received and sets last_email_at on success", async () => {
const env = createMockEnv();
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
await processEmail(makeInput(), env as any);
const counters = await getCounters(env.EMAIL_STORAGE as any);
expect(counters.emails_received).toBe(1);
expect(counters.emails_rejected).toBe(0);
expect(counters.last_email_at).toBeDefined();
});
it("increments emails_rejected when validation fails", async () => {
const env = createMockEnv();
// No feed config → 404 rejection
await processEmail(makeInput(), env as any);
const counters = await getCounters(env.EMAIL_STORAGE as any);
expect(counters.emails_rejected).toBe(1);
expect(counters.emails_received).toBe(0);
});
});
describe("processEmail — feed icon", () => {
let env: ReturnType<typeof createMockEnv>;
beforeEach(async () => {
env = createMockEnv();
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
});
it("persists the latest sender domain on the feed metadata", async () => {
await processEmail(
makeInput({ from: "News <news@github.com>" }),
env as any,
);
const metadata = (await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
)) as { iconDomain?: string };
expect(metadata.iconDomain).toBe("github.com");
});
it("triggers a background favicon fetch via ctx.waitUntil", async () => {
let fetched = false;
server.use(
http.get("https://github.com/favicon.ico", () => {
fetched = true;
return new HttpResponse(new Uint8Array([1, 2, 3]), {
headers: { "Content-Type": "image/png" },
});
}),
);
const pending: Promise<unknown>[] = [];
const ctx = {
waitUntil: (p: Promise<unknown>) => pending.push(p),
passThroughOnException: () => {},
} as unknown as ExecutionContext;
await processEmail(makeInput({ from: "news@github.com" }), env as any, ctx);
await Promise.all(pending);
expect(fetched).toBe(true);
expect(
await env.EMAIL_STORAGE.get(iconKey("github.com"), "json"),
).toMatchObject({ contentType: "image/png" });
});
});
describe("processEmail — unsubscribe capture", () => {
let env: ReturnType<typeof createMockEnv>;
beforeEach(async () => {
env = createMockEnv();
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
});
it("stores the one-click unsubscribe URL on the feed metadata, keyed by sender", async () => {
await processEmail(
makeInput({
senders: ["news@example.com"],
headers: {
"list-unsubscribe": "<https://example.com/u?t=abc>",
"list-unsubscribe-post": "List-Unsubscribe=One-Click",
},
}),
env as any,
);
const metadata = (await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
)) as { unsubscribe?: Record<string, string> };
expect(metadata.unsubscribe).toEqual({
"news@example.com": "https://example.com/u?t=abc",
});
});
it("keeps one entry per sender and overwrites with the latest URL", async () => {
await processEmail(
makeInput({
senders: ["a@one.com"],
headers: {
"list-unsubscribe": "<https://one.com/u/1>",
"list-unsubscribe-post": "List-Unsubscribe=One-Click",
},
}),
env as any,
);
await processEmail(
makeInput({
senders: ["b@two.com"],
headers: {
"list-unsubscribe": "<https://two.com/u/1>",
"list-unsubscribe-post": "List-Unsubscribe=One-Click",
},
}),
env as any,
);
await processEmail(
makeInput({
senders: ["a@one.com"],
headers: {
"list-unsubscribe": "<https://one.com/u/2>",
"list-unsubscribe-post": "List-Unsubscribe=One-Click",
},
}),
env as any,
);
const metadata = (await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
)) as { unsubscribe?: Record<string, string> };
expect(metadata.unsubscribe).toEqual({
"a@one.com": "https://one.com/u/2",
"b@two.com": "https://two.com/u/1",
});
});
it("does not store anything without the one-click Post header", async () => {
await processEmail(
makeInput({
headers: { "list-unsubscribe": "<https://example.com/u/1>" },
}),
env as any,
);
const metadata = (await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
)) as { unsubscribe?: Record<string, string> };
expect(metadata.unsubscribe).toBeUndefined();
});
});
+212
View File
@@ -0,0 +1,212 @@
import { EmailParser } from "../domain/email-parser";
import { AttachmentData, EmailMetadata, Env } from "../types";
import { notifySubscribers } from "../infrastructure/websub";
import { bumpCounters } from "../application/stats";
import {
cacheFaviconForDomain,
extractEmailDomain,
} from "../infrastructure/favicon-fetcher";
import { parseOneClickUnsubscribe } from "../infrastructure/unsubscribe";
import { getAttachmentBucket } from "../infrastructure/attachments";
import { FeedRepository } from "../domain/feed-repository";
import { Feed } from "../domain/feed.aggregate";
import { FeedId } from "../domain/value-objects/feed-id";
import { logger } from "../infrastructure/logger";
import { FEED_MAX_BYTES } from "../config/constants";
export interface RawAttachment {
filename: string;
contentType: string;
content: ArrayBuffer;
contentId?: string;
}
export interface ProcessEmailInput {
toAddress: string;
from: string;
senders: string[];
subject: string;
content: string;
receivedAt: number;
headers?: Record<string, string>;
attachments?: RawAttachment[];
}
export type IngestRejectionReason =
| "invalid_address"
| "feed_not_found"
| "feed_expired"
| "sender_blocked";
/**
* Outcome of ingesting an email — a domain result, not an HTTP concern. The edge
* (forwardemail.ts) maps this to a status code; the Cloudflare email handler
* logs the reason. Keeping HTTP out of the core keeps ingestion transport-agnostic.
*/
export type IngestResult =
| { ok: true; feedId: string }
| { ok: false; reason: IngestRejectionReason };
async function uploadAttachments(
attachments: RawAttachment[],
bucket: R2Bucket,
): Promise<AttachmentData[]> {
return Promise.all(
attachments.map(async (att) => {
const id = crypto.randomUUID();
await bucket.put(id, att.content, {
httpMetadata: {
contentType: att.contentType,
contentDisposition: `attachment; filename="${att.filename}"`,
},
});
return {
id,
filename: att.filename,
contentType: att.contentType,
size: att.content.byteLength,
...(att.contentId ? { contentId: att.contentId } : {}),
};
}),
);
}
async function loadAcceptingFeed(
input: ProcessEmailInput,
env: Env,
): Promise<
{ ok: true; feed: Feed } | { ok: false; reason: IngestRejectionReason }
> {
const feedId = EmailParser.extractFeedId(input.toAddress);
if (!feedId) {
logger.error("Invalid email address format", {
toAddress: input.toAddress,
});
return { ok: false, reason: "invalid_address" };
}
const feed = await FeedRepository.from(env).load(FeedId.fromTrusted(feedId));
if (!feed) {
logger.error("Feed not found", { feedId });
return { ok: false, reason: "feed_not_found" };
}
if (feed.isExpired()) {
logger.warn("Rejected email: feed expired", { feedId });
return { ok: false, reason: "feed_expired" };
}
if (feed.accepts(input.senders) === "blocked") {
logger.warn("Rejected email: sender filter", {
feedId,
senders: input.senders,
allowedSenders: feed.config.allowed_senders,
blockedSenders: feed.config.blocked_senders,
});
return { ok: false, reason: "sender_blocked" };
}
return { ok: true, feed };
}
async function storeEmail(
feed: Feed,
input: ProcessEmailInput,
env: Env,
ctx?: ExecutionContext,
): Promise<void> {
const attachmentBucket = getAttachmentBucket(env);
const storedAttachments: AttachmentData[] =
attachmentBucket && input.attachments?.length
? await uploadAttachments(input.attachments, attachmentBucket)
: [];
const emailData = {
subject: input.subject,
from: input.from,
content: input.content,
receivedAt: input.receivedAt,
headers: input.headers ?? {},
...(storedAttachments.length > 0 ? { attachments: storedAttachments } : {}),
};
const repo = FeedRepository.from(env);
const emailKey = repo.newEmailKey(feed.id);
await repo.putEmail(emailKey, emailData);
const serialisedSize = new TextEncoder().encode(
JSON.stringify(emailData),
).byteLength;
const newEntry: EmailMetadata = {
key: emailKey,
subject: emailData.subject,
receivedAt: emailData.receivedAt,
size: serialisedSize,
...(storedAttachments.length > 0
? { attachmentIds: storedAttachments.map((a) => a.id) }
: {}),
};
// Track the latest sender's domain (feed icon) and capture the RFC 8058
// one-click unsubscribe link, keyed by sender so each newsletter keeps its
// own latest URL (fired when the feed is deleted).
const iconDomain = extractEmailDomain(input.from);
const unsubUrl = parseOneClickUnsubscribe(input.headers ?? {});
const unsub = unsubUrl
? {
senderKey: input.senders[0] || iconDomain || input.from,
url: unsubUrl,
}
: undefined;
const maxBytes =
parseInt(env.FEED_MAX_SIZE_BYTES ?? "", 10) || FEED_MAX_BYTES;
const { dropped } = feed.ingest(newEntry, {
maxBytes,
iconDomain: iconDomain ?? undefined,
unsub,
});
const r2Deletions =
attachmentBucket && dropped.length > 0
? dropped
.flatMap((e) => e.attachmentIds ?? [])
.map((id) => attachmentBucket.delete(id))
: [];
// KV has no compare-and-swap: the load (in loadAcceptingFeed) and this write
// are not serialised, so concurrent ingests for one feed can lose updates.
// Accepted under KV's eventual-consistency model; the Feed aggregate is the
// seam a Durable Object would later wrap to serialise these writers.
await Promise.all([
repo.saveMetadata(feed),
...dropped.map((e) => repo.deleteEmail(e.key)),
...r2Deletions,
]);
logger.info("Email processed", { feedId: feed.id.value });
if (ctx) {
ctx.waitUntil(notifySubscribers(feed.id.value, env));
if (iconDomain) {
ctx.waitUntil(cacheFaviconForDomain(iconDomain, env));
}
}
}
export async function processEmail(
input: ProcessEmailInput,
env: Env,
ctx?: ExecutionContext,
): Promise<IngestResult> {
const validation = await loadAcceptingFeed(input, env);
if (!validation.ok) {
await bumpCounters(env.EMAIL_STORAGE, { emails_rejected: 1 });
return validation;
}
await storeEmail(validation.feed, input, env, ctx);
await bumpCounters(env.EMAIL_STORAGE, {
emails_received: 1,
last_email_at: new Date().toISOString(),
});
return { ok: true, feedId: validation.feed.id.value };
}
+36
View File
@@ -0,0 +1,36 @@
import { Env, FeedConfig, EmailData } from "../types";
import { MAX_FEED_ITEMS } from "../config/constants";
import { FeedRepository } from "../domain/feed-repository";
import { FeedId } from "../domain/value-objects/feed-id";
export interface FeedData {
feedConfig: FeedConfig;
emails: EmailData[];
}
export async function fetchFeedData(
feedId: string,
env: Env,
): Promise<FeedData | null> {
const repo = FeedRepository.from(env);
const id = FeedId.fromTrusted(feedId);
const feedMetadata = await repo.getMetadata(id);
if (!feedMetadata) return null;
const feedConfig = (await repo.getConfig(id)) ?? {
title: `Newsletter Feed ${feedId}`,
description: "Converted email newsletter",
language: "en",
created_at: Date.now(),
};
const emailRefs = feedMetadata.emails.slice(0, MAX_FEED_ITEMS);
const emails: EmailData[] = [];
for (const ref of emailRefs) {
const data = await repo.getEmail(ref.key);
if (data) emails.push(data);
}
return { feedConfig, emails };
}
+180
View File
@@ -0,0 +1,180 @@
import { Context } from "hono";
import { Env, FeedConfig } from "../types";
import { bumpCounters } from "../application/stats";
import { waitUntilSafe } from "../infrastructure/worker";
import { sendUnsubscribes } from "../infrastructure/unsubscribe";
import { getAttachmentBucket } from "../infrastructure/attachments";
import { FeedRepository } from "../domain/feed-repository";
import { FeedId } from "../domain/value-objects/feed-id";
import {
Feed,
CreateFeedInput,
UpdateFeedInput,
} from "../domain/feed.aggregate";
import {
purgeFeedKeysStep,
collectUnsubscribeUrls,
} from "../routes/admin/helpers";
export type { CreateFeedInput, UpdateFeedInput };
/**
* Create a feed: write its config + empty metadata, register it in the global
* list, and bump the `feeds_created` counter. Returns the new feed id + config.
*/
export async function createFeedRecord(
env: Env,
input: CreateFeedInput,
): Promise<{ feedId: string; config: FeedConfig }> {
const repo = FeedRepository.from(env);
const feed = Feed.create(FeedId.generate(), input, env);
await repo.save(feed);
await repo.addToList(
feed.id,
feed.config.title,
feed.config.description,
feed.config.expires_at,
);
await bumpCounters(env.EMAIL_STORAGE, {
feeds_created: 1,
last_feed_created_at: new Date().toISOString(),
});
return { feedId: feed.id.value, config: feed.config };
}
export type UpdateFeedResult =
| { status: "ok"; config: FeedConfig }
| { status: "not_found" }
| { status: "expired" };
/**
* In-place edit of title/description only — never touches expiry. Used by the
* dashboard's minimal edit. Mirrors the new title/description into the list.
*/
export async function renameFeed(
env: Env,
feedId: string,
patch: { title?: string; description?: string },
): Promise<UpdateFeedResult> {
const repo = FeedRepository.from(env);
const feed = await repo.load(FeedId.fromTrusted(feedId));
if (!feed) return { status: "not_found" };
feed.rename(patch);
await repo.saveConfig(feed);
await repo.updateInList(
feed.id,
feed.config.title,
feed.config.description,
feed.config.expires_at,
);
return { status: "ok", config: feed.config };
}
/**
* Full edit: apply the patch, recompute expiry, and reject expired feeds. Fields
* left undefined are preserved. Mirrors title/description/expiry into the list.
*/
export async function editFeed(
env: Env,
feedId: string,
input: UpdateFeedInput,
): Promise<UpdateFeedResult> {
const repo = FeedRepository.from(env);
const feed = await repo.load(FeedId.fromTrusted(feedId));
if (!feed) return { status: "not_found" };
if (feed.edit(input, env).status === "expired") {
return { status: "expired" };
}
await repo.saveConfig(feed);
await repo.updateInList(
feed.id,
feed.config.title,
feed.config.description,
feed.config.expires_at,
);
return { status: "ok", config: feed.config };
}
type DeleteFeedFastResult = {
ok: boolean;
configDeleted: boolean;
metadataDeleted: boolean;
errors: string[];
};
/**
* Delete a feed's config + metadata keys, reporting per-key outcomes. The
* larger email/attachment cleanup is handled separately via purgeFeedKeysStep.
*/
export async function deleteFeedFastDetailed(
emailStorage: KVNamespace,
feedId: string,
): Promise<DeleteFeedFastResult> {
const repo = new FeedRepository(emailStorage);
const id = FeedId.fromTrusted(feedId);
const errors: string[] = [];
let configDeleted = false;
let metadataDeleted = false;
try {
await repo.deleteConfig(id);
configDeleted = true;
} catch (error) {
errors.push(`config delete failed: ${String(error)}`);
}
try {
await repo.deleteMetadata(id);
metadataDeleted = true;
} catch (error) {
errors.push(`metadata delete failed: ${String(error)}`);
}
return { ok: configDeleted, configDeleted, metadataDeleted, errors };
}
/**
* Delete a single feed end-to-end: capture unsubscribe URLs, drop its config +
* metadata, remove it from the list, bump the counter, and schedule background
* unsubscribe requests + key purge via ctx.waitUntil. Returns whether the feed
* was present in the global list.
*/
export async function deleteFeedRecord(
c: Context<{ Bindings: Env }>,
env: Env,
feedId: string,
): Promise<boolean> {
const emailStorage = env.EMAIL_STORAGE;
const repo = new FeedRepository(emailStorage);
// Read unsubscribe URLs before the metadata is deleted below.
const unsubscribeUrls = await collectUnsubscribeUrls(emailStorage, feedId);
await deleteFeedFastDetailed(emailStorage, feedId);
const removed = await repo.removeFromList(FeedId.fromTrusted(feedId));
if (removed) {
await bumpCounters(emailStorage, { feeds_deleted: 1 });
}
if (unsubscribeUrls.length > 0) {
waitUntilSafe(c, sendUnsubscribes(unsubscribeUrls, env));
}
waitUntilSafe(
c,
purgeFeedKeysStep(emailStorage, feedId, {
bucket: getAttachmentBucket(env),
}),
);
return removed;
}
+217
View File
@@ -0,0 +1,217 @@
import { describe, it, expect } from "vitest";
import { createMockEnv, MockR2 } from "../test/setup";
import {
getCounters,
bumpCounters,
countKeysByPrefix,
getStats,
scanR2Usage,
scanKvUsage,
setStorageSnapshot,
} from "./stats";
import { getAttachmentBucket } from "../infrastructure/attachments";
import { STATS_KEY, FEEDS_LIST_KEY } from "../config/constants";
import { Env } from "../types";
describe("stats helper", () => {
it("returns zeroed counters when nothing is stored", async () => {
const env = createMockEnv() as unknown as Env;
const counters = await getCounters(env.EMAIL_STORAGE);
expect(counters).toMatchObject({
feeds_created: 0,
feeds_deleted: 0,
emails_received: 0,
emails_rejected: 0,
});
expect(counters.first_seen).toBeUndefined();
});
it("accumulates numeric deltas across bumps", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await bumpCounters(kv, { emails_received: 1 });
await bumpCounters(kv, { emails_received: 2, emails_rejected: 1 });
await bumpCounters(kv, { feeds_created: 1, feeds_deleted: 3 });
const counters = await getCounters(kv);
expect(counters.emails_received).toBe(3);
expect(counters.emails_rejected).toBe(1);
expect(counters.feeds_created).toBe(1);
expect(counters.feeds_deleted).toBe(3);
});
it("overwrites date-time fields and sets first_seen once", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await bumpCounters(kv, {
emails_received: 1,
last_email_at: "2026-01-01T00:00:00.000Z",
});
const first = await getCounters(kv);
const firstSeen = first.first_seen;
expect(firstSeen).toBeDefined();
expect(first.last_email_at).toBe("2026-01-01T00:00:00.000Z");
await bumpCounters(kv, {
emails_received: 1,
last_email_at: "2026-02-02T00:00:00.000Z",
});
const second = await getCounters(kv);
expect(second.last_email_at).toBe("2026-02-02T00:00:00.000Z");
expect(second.first_seen).toBe(firstSeen);
});
it("counts keys by prefix", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await kv.put("websub:a:1", "{}");
await kv.put("websub:a:2", "{}");
await kv.put("feed:x:config", "{}");
expect(await countKeysByPrefix(kv, "websub:")).toBe(2);
expect(await countKeysByPrefix(kv, "missing:")).toBe(0);
});
it("getStats combines persisted counters with live values", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await kv.put(
FEEDS_LIST_KEY,
JSON.stringify({
feeds: [
{ id: "a", title: "A" },
{ id: "b", title: "B" },
],
}),
);
await kv.put("websub:subs:a", "{}");
await bumpCounters(kv, { emails_received: 5, feeds_created: 2 });
const stats = await getStats(env);
expect(stats.active_feeds).toBe(2);
expect(stats.websub_subscriptions_active).toBe(1);
expect(stats.emails_received).toBe(5);
expect(stats.feeds_created).toBe(2);
});
it("never throws on a failing KV (counters are best-effort)", async () => {
const brokenKv = {
get: async () => {
throw new Error("kv down");
},
put: async () => {
throw new Error("kv down");
},
} as unknown as KVNamespace;
await expect(
bumpCounters(brokenKv, { emails_received: 1 }),
).resolves.toBeUndefined();
expect(await getCounters(brokenKv)).toMatchObject({ emails_received: 0 });
expect(await countKeysByPrefix(brokenKv, "websub:")).toBe(0);
});
it("persists under the stats KV key", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await bumpCounters(kv, { feeds_created: 1 });
const raw = (await kv.get(STATS_KEY, { type: "json" })) as {
feeds_created: number;
};
expect(raw.feeds_created).toBe(1);
});
it("getStats reports attachments_enabled based on the toggle", async () => {
const off = createMockEnv() as unknown as Env;
expect((await getStats(off)).attachments_enabled).toBe(false);
const on = createMockEnv({ withR2: true }) as unknown as Env;
expect((await getStats(on)).attachments_enabled).toBe(true);
const disabled = createMockEnv({ withR2: true }) as unknown as Env;
(disabled as any).ATTACHMENTS_ENABLED = "false";
expect((await getStats(disabled)).attachments_enabled).toBe(false);
});
});
describe("getAttachmentBucket", () => {
it("returns the bucket when bound and not disabled", () => {
const env = createMockEnv({ withR2: true }) as unknown as Env;
expect(getAttachmentBucket(env)).toBeDefined();
});
it("returns undefined when no bucket is bound", () => {
const env = createMockEnv() as unknown as Env;
expect(getAttachmentBucket(env)).toBeUndefined();
});
it("returns undefined when explicitly disabled", () => {
const env = createMockEnv({ withR2: true }) as unknown as Env;
(env as any).ATTACHMENTS_ENABLED = "false";
expect(getAttachmentBucket(env)).toBeUndefined();
});
});
describe("storage usage scans", () => {
it("scanR2Usage sums object sizes and counts", async () => {
const bucket = new MockR2();
await bucket.put("a", new Uint8Array(100));
await bucket.put("b", new Uint8Array(250));
const usage = await scanR2Usage(bucket as unknown as R2Bucket);
expect(usage.count).toBe(2);
expect(usage.bytes).toBe(350);
});
it("scanR2Usage returns zeros for an empty bucket", async () => {
const usage = await scanR2Usage(new MockR2() as unknown as R2Bucket);
expect(usage).toEqual({ bytes: 0, count: 0 });
});
it("scanKvUsage estimates KV bytes from stored email sizes", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await kv.put(
FEEDS_LIST_KEY,
JSON.stringify({
feeds: [
{ id: "a", title: "A" },
{ id: "b", title: "B" },
],
}),
);
await kv.put(
"feed:a:metadata",
JSON.stringify({
emails: [
{ key: "k1", size: 100 },
{ key: "k2", size: 50 },
],
}),
);
await kv.put(
"feed:b:metadata",
JSON.stringify({ emails: [{ key: "k3", size: 25 }] }),
);
const usage = await scanKvUsage(kv);
expect(usage.bytes).toBe(175);
});
it("setStorageSnapshot writes the snapshot fields", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await setStorageSnapshot(kv, {
attachments_bytes: 1234,
attachments_count: 5,
kv_bytes_estimated: 678,
});
const counters = await getCounters(kv);
expect(counters.attachments_bytes).toBe(1234);
expect(counters.attachments_count).toBe(5);
expect(counters.kv_bytes_estimated).toBe(678);
expect(counters.storage_scanned_at).toBeDefined();
});
});
+148
View File
@@ -0,0 +1,148 @@
import { Counters, Env, StatsResponse } from "../types";
import { logger } from "../infrastructure/logger";
import { FeedRepository } from "../domain/feed-repository";
import { CountersRepository } from "../domain/counters-repository";
import { WebSubSubscriptionRepository } from "../domain/websub-subscription-repository";
import { FeedId } from "../domain/value-objects/feed-id";
import { getAttachmentBucket } from "../infrastructure/attachments";
const EMPTY_COUNTERS: Counters = {
feeds_created: 0,
feeds_deleted: 0,
emails_received: 0,
emails_rejected: 0,
unsubscribes_sent: 0,
};
export async function getCounters(kv: KVNamespace): Promise<Counters> {
try {
const stored = await new CountersRepository(kv).getRaw();
return { ...EMPTY_COUNTERS, ...(stored || {}) };
} catch (error) {
logger.error("Error reading counters", { error: String(error) });
return { ...EMPTY_COUNTERS };
}
}
/**
* Read-modify-write the counters singleton. KV has no atomic increment, so
* concurrent invocations can lose updates — accepted given KV's eventual
* consistency and this app's low volume (see email-processor.ts storeEmail).
* Never throws: counter failures must not break ingestion or admin flows.
*/
export async function bumpCounters(
kv: KVNamespace,
changes: Partial<Omit<Counters, "first_seen">>,
): Promise<void> {
try {
const current = await getCounters(kv);
current.feeds_created += changes.feeds_created ?? 0;
current.feeds_deleted += changes.feeds_deleted ?? 0;
current.emails_received += changes.emails_received ?? 0;
current.emails_rejected += changes.emails_rejected ?? 0;
current.unsubscribes_sent += changes.unsubscribes_sent ?? 0;
if (changes.last_email_at) current.last_email_at = changes.last_email_at;
if (changes.last_feed_created_at)
current.last_feed_created_at = changes.last_feed_created_at;
if (!current.first_seen) current.first_seen = new Date().toISOString();
await new CountersRepository(kv).put(current);
} catch (error) {
logger.error("Error updating counters", { error: String(error) });
}
}
export async function countKeysByPrefix(
kv: KVNamespace,
prefix: string,
): Promise<number> {
return new FeedRepository(kv).countKeysByPrefix(prefix);
}
export async function getStats(env: Env): Promise<StatsResponse> {
const repo = FeedRepository.from(env);
const [counters, feeds, websubCount] = await Promise.all([
getCounters(env.EMAIL_STORAGE),
repo.listFeeds(),
WebSubSubscriptionRepository.from(env).countKeys(),
]);
return {
...counters,
active_feeds: feeds.length,
websub_subscriptions_active: websubCount,
attachments_enabled: !!getAttachmentBucket(env),
};
}
/** Sum the byte size and object count of every attachment stored in R2. */
export async function scanR2Usage(
bucket: R2Bucket,
): Promise<{ bytes: number; count: number }> {
let bytes = 0;
let count = 0;
let cursor: string | undefined;
try {
do {
const listed = await bucket.list({ cursor });
for (const obj of listed.objects) {
bytes += obj.size;
count += 1;
}
cursor = listed.truncated ? listed.cursor : undefined;
} while (cursor);
} catch (error) {
logger.error("Error scanning R2 usage", { error: String(error) });
}
return { bytes, count };
}
/**
* Estimate KV storage used. KV exposes no size API, so we sum the per-email
* sizes already recorded in each feed's metadata — email bodies dominate KV
* usage. Feed config/websub/stats keys are excluded, so this is a lower-bound
* estimate.
*/
export async function scanKvUsage(kv: KVNamespace): Promise<{ bytes: number }> {
let bytes = 0;
try {
const repo = new FeedRepository(kv);
const feeds = await repo.listFeeds();
for (const feed of feeds) {
const metadata = await repo.getMetadata(FeedId.fromTrusted(feed.id));
if (!metadata) continue;
for (const email of metadata.emails) {
bytes += email.size ?? 0;
}
}
} catch (error) {
logger.error("Error estimating KV usage", { error: String(error) });
}
return { bytes };
}
/**
* Overwrite the storage-usage snapshot fields on the counters singleton.
* Unlike bumpCounters these are set (not incremented). Never throws.
*/
export async function setStorageSnapshot(
kv: KVNamespace,
snapshot: {
attachments_bytes: number;
attachments_count: number;
kv_bytes_estimated: number;
},
): Promise<void> {
try {
const current = await getCounters(kv);
current.attachments_bytes = snapshot.attachments_bytes;
current.attachments_count = snapshot.attachments_count;
current.kv_bytes_estimated = snapshot.kv_bytes_estimated;
current.storage_scanned_at = new Date().toISOString();
if (!current.first_seen) current.first_seen = new Date().toISOString();
await new CountersRepository(kv).put(current);
} catch (error) {
logger.error("Error writing storage snapshot", { error: String(error) });
}
}