mirror of
https://github.com/juherr/kill-the-news.git
synced 2026-06-20 22:03:48 +00:00
refactor(domain): introduce the Feed aggregate as the write-path API
Add a Feed aggregate class owning config + the email index, with create, ingest, removeEmails, isExpired and accepts delegating to the existing pure invariant functions. FeedRepository gains load/save/saveMetadata that reconstitute and persist the aggregate. All write paths now go through it: createFeedRecord (Feed.create), email ingestion (feed.ingest), and every email deletion in the admin UI and REST API (feed.removeEmails) — no route mutates metadata.emails directly anymore. KV key strings unchanged. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -8,6 +8,7 @@ import {
|
|||||||
} from "../types";
|
} from "../types";
|
||||||
import { FEEDS_LIST_KEY } from "../config/constants";
|
import { FEEDS_LIST_KEY } from "../config/constants";
|
||||||
import { feedKeys } from "./feed-keys";
|
import { feedKeys } from "./feed-keys";
|
||||||
|
import { Feed } from "./feed.aggregate";
|
||||||
import { logger } from "../lib/logger";
|
import { logger } from "../lib/logger";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -55,6 +56,37 @@ export class FeedRepository {
|
|||||||
return feedKeys.feedIdFromEmail(key);
|
return feedKeys.feedIdFromEmail(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Feed aggregate ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the aggregate (config + email index). A feed exists iff it has a
|
||||||
|
* config; metadata defaults to empty so a freshly-created feed still loads.
|
||||||
|
*/
|
||||||
|
async load(feedId: string): Promise<Feed | null> {
|
||||||
|
const [config, metadata] = await Promise.all([
|
||||||
|
this.getConfig(feedId),
|
||||||
|
this.getMetadata(feedId),
|
||||||
|
]);
|
||||||
|
if (!config) return null;
|
||||||
|
return Feed.reconstitute(feedId, config, metadata ?? { emails: [] });
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Persist both keys the aggregate owns (config + metadata). */
|
||||||
|
async save(feed: Feed): Promise<void> {
|
||||||
|
await Promise.all([
|
||||||
|
this.putConfig(feed.id, feed.config),
|
||||||
|
this.putMetadata(feed.id, feed.metadata),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Persist only the email index. Used by the ingest/delete paths where config
|
||||||
|
* is unchanged — avoids a redundant config write on the hot path.
|
||||||
|
*/
|
||||||
|
async saveMetadata(feed: Feed): Promise<void> {
|
||||||
|
await this.putMetadata(feed.id, feed.metadata);
|
||||||
|
}
|
||||||
|
|
||||||
// ── Feed config ───────────────────────────────────────────────────────────
|
// ── Feed config ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
async getConfig(feedId: string): Promise<FeedConfig | null> {
|
async getConfig(feedId: string): Promise<FeedConfig | null> {
|
||||||
|
|||||||
@@ -0,0 +1,157 @@
|
|||||||
|
import { describe, it, expect } from "vitest";
|
||||||
|
import { createMockEnv } from "../test/setup";
|
||||||
|
import { Feed, CreateFeedInput } from "./feed.aggregate";
|
||||||
|
import { FeedRepository } from "./feed-repository";
|
||||||
|
import type { Env, EmailMetadata } from "../types";
|
||||||
|
|
||||||
|
const mockEnv = (overrides: Partial<Env> = {}) =>
|
||||||
|
({ ...createMockEnv(), ...overrides }) as unknown as Env;
|
||||||
|
|
||||||
|
const createInput = (
|
||||||
|
overrides: Partial<CreateFeedInput> = {},
|
||||||
|
): CreateFeedInput => ({
|
||||||
|
title: "News",
|
||||||
|
language: "en",
|
||||||
|
allowedSenders: [],
|
||||||
|
blockedSenders: [],
|
||||||
|
...overrides,
|
||||||
|
});
|
||||||
|
|
||||||
|
const entry = (overrides: Partial<EmailMetadata> = {}): EmailMetadata => ({
|
||||||
|
key: "feed:a.b.42:1",
|
||||||
|
subject: "Hello",
|
||||||
|
receivedAt: 1,
|
||||||
|
size: 10,
|
||||||
|
...overrides,
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Feed.create", () => {
|
||||||
|
it("builds a config with an empty email index and no expiry by default", () => {
|
||||||
|
const feed = Feed.create("a.b.42", createInput(), mockEnv());
|
||||||
|
expect(feed.id).toBe("a.b.42");
|
||||||
|
expect(feed.config.title).toBe("News");
|
||||||
|
expect(feed.config.expires_at).toBeUndefined();
|
||||||
|
expect(feed.metadata.emails).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("resolves expiry from lifetimeHours", () => {
|
||||||
|
const feed = Feed.create(
|
||||||
|
"a.b.42",
|
||||||
|
createInput({ lifetimeHours: 1 }),
|
||||||
|
mockEnv(),
|
||||||
|
);
|
||||||
|
expect(feed.config.expires_at).toBeGreaterThan(Date.now());
|
||||||
|
});
|
||||||
|
|
||||||
|
it("lets FEED_TTL_HOURS override a client lifetime", () => {
|
||||||
|
const feed = Feed.create(
|
||||||
|
"a.b.42",
|
||||||
|
createInput({ lifetimeHours: 1000000 }),
|
||||||
|
mockEnv({ FEED_TTL_HOURS: "1" }),
|
||||||
|
);
|
||||||
|
const oneClientHour = Date.now() + 1000000 * 3_600_000;
|
||||||
|
expect(feed.config.expires_at).toBeLessThan(oneClientHour);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Feed.isExpired / accepts", () => {
|
||||||
|
it("reports expiry against the configured instant", () => {
|
||||||
|
const feed = Feed.reconstitute(
|
||||||
|
"a.b.42",
|
||||||
|
{ title: "T", language: "en", created_at: 0, expires_at: 100 },
|
||||||
|
{ emails: [] },
|
||||||
|
);
|
||||||
|
expect(feed.isExpired(50)).toBe(false);
|
||||||
|
expect(feed.isExpired(150)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("applies the sender policy", () => {
|
||||||
|
const feed = Feed.reconstitute(
|
||||||
|
"a.b.42",
|
||||||
|
{
|
||||||
|
title: "T",
|
||||||
|
language: "en",
|
||||||
|
created_at: 0,
|
||||||
|
allowed_senders: ["good@example.com"],
|
||||||
|
},
|
||||||
|
{ emails: [] },
|
||||||
|
);
|
||||||
|
expect(feed.accepts(["good@example.com"])).toBe("accepted");
|
||||||
|
expect(feed.accepts(["bad@example.com"])).toBe("blocked");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Feed.ingest", () => {
|
||||||
|
it("prepends the entry, tracks icon/unsub and trims to the byte budget", () => {
|
||||||
|
const feed = Feed.reconstitute(
|
||||||
|
"a.b.42",
|
||||||
|
{ title: "T", language: "en", created_at: 0 },
|
||||||
|
{
|
||||||
|
emails: [entry({ key: "old", size: 400 })],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const { dropped } = feed.ingest(entry({ key: "new", size: 400 }), {
|
||||||
|
maxBytes: 500,
|
||||||
|
iconDomain: "example.com",
|
||||||
|
unsub: { senderKey: "news@example.com", url: "https://u/1" },
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(feed.metadata.emails[0].key).toBe("new");
|
||||||
|
expect(feed.metadata.iconDomain).toBe("example.com");
|
||||||
|
expect(feed.metadata.unsubscribe).toEqual({
|
||||||
|
"news@example.com": "https://u/1",
|
||||||
|
});
|
||||||
|
expect(dropped.map((e) => e.key)).toEqual(["old"]);
|
||||||
|
expect(feed.metadata.emails.map((e) => e.key)).toEqual(["new"]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Feed.removeEmails", () => {
|
||||||
|
it("drops matching keys and returns the removed entries", () => {
|
||||||
|
const feed = Feed.reconstitute(
|
||||||
|
"a.b.42",
|
||||||
|
{ title: "T", language: "en", created_at: 0 },
|
||||||
|
{
|
||||||
|
emails: [
|
||||||
|
entry({ key: "k1" }),
|
||||||
|
entry({ key: "k2" }),
|
||||||
|
entry({ key: "k3" }),
|
||||||
|
],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const { removed } = feed.removeEmails(["k1", "k3", "missing"]);
|
||||||
|
expect(removed.map((e) => e.key).sort()).toEqual(["k1", "k3"]);
|
||||||
|
expect(feed.metadata.emails.map((e) => e.key)).toEqual(["k2"]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("FeedRepository.load / save round-trip", () => {
|
||||||
|
it("persists a created feed and reflects later mutations", async () => {
|
||||||
|
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||||
|
const created = Feed.create(
|
||||||
|
"a.b.42",
|
||||||
|
createInput({ title: "Round" }),
|
||||||
|
mockEnv(),
|
||||||
|
);
|
||||||
|
await repo.save(created);
|
||||||
|
|
||||||
|
const loaded = await repo.load("a.b.42");
|
||||||
|
expect(loaded).not.toBeNull();
|
||||||
|
expect(loaded!.config.title).toBe("Round");
|
||||||
|
|
||||||
|
loaded!.ingest(entry({ key: "feed:a.b.42:1" }), { maxBytes: 1_000_000 });
|
||||||
|
await repo.saveMetadata(loaded!);
|
||||||
|
|
||||||
|
const reloaded = await repo.load("a.b.42");
|
||||||
|
expect(reloaded!.metadata.emails.map((e) => e.key)).toEqual([
|
||||||
|
"feed:a.b.42:1",
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns null when the feed has no config", async () => {
|
||||||
|
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||||
|
expect(await repo.load("missing")).toBeNull();
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,135 @@
|
|||||||
|
import { Env, FeedConfig, FeedMetadata, EmailMetadata } from "../types";
|
||||||
|
import {
|
||||||
|
resolveExpiresAt,
|
||||||
|
isExpired,
|
||||||
|
applySenderPolicy,
|
||||||
|
trimToByteBudget,
|
||||||
|
SenderDecision,
|
||||||
|
} from "./feed";
|
||||||
|
|
||||||
|
export interface CreateFeedInput {
|
||||||
|
title: string;
|
||||||
|
description?: string;
|
||||||
|
language: string;
|
||||||
|
allowedSenders: string[];
|
||||||
|
blockedSenders: string[];
|
||||||
|
lifetimeHours?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface UpdateFeedInput {
|
||||||
|
title?: string;
|
||||||
|
description?: string;
|
||||||
|
language?: string;
|
||||||
|
allowedSenders?: string[];
|
||||||
|
blockedSenders?: string[];
|
||||||
|
lifetimeHours?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface IngestOptions {
|
||||||
|
maxBytes: number;
|
||||||
|
iconDomain?: string;
|
||||||
|
/** RFC 8058 one-click unsubscribe link, keyed by the sending newsletter. */
|
||||||
|
unsub?: { senderKey: string; url: string };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Feed aggregate: the consistency boundary around a feed's config and the
|
||||||
|
* metadata index of its emails. All mutations to either go through a method
|
||||||
|
* here so the invariants (expiry policy, sender policy, byte budget) live in one
|
||||||
|
* place. Email bodies are large blobs referenced by `metadata.emails[].key` and
|
||||||
|
* deliberately sit *outside* the aggregate — the caller flushes them alongside
|
||||||
|
* `FeedRepository.save`/`saveMetadata`.
|
||||||
|
*
|
||||||
|
* I/O-free: load and persist state through `FeedRepository`. KV has no multi-key
|
||||||
|
* transaction, so a future Durable Object keyed by feed id would wrap
|
||||||
|
* load→mutate→save to serialise concurrent writers (see email-processor.ts).
|
||||||
|
*/
|
||||||
|
export class Feed {
|
||||||
|
private constructor(
|
||||||
|
readonly id: string,
|
||||||
|
private _config: FeedConfig,
|
||||||
|
private _metadata: FeedMetadata,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/** Mint a brand-new feed with an empty email index. */
|
||||||
|
static create(id: string, input: CreateFeedInput, env: Env): Feed {
|
||||||
|
const now = Date.now();
|
||||||
|
const expiresAt = resolveExpiresAt(env, input.lifetimeHours);
|
||||||
|
const config: FeedConfig = {
|
||||||
|
title: input.title,
|
||||||
|
description: input.description,
|
||||||
|
language: input.language,
|
||||||
|
allowed_senders: input.allowedSenders,
|
||||||
|
blocked_senders: input.blockedSenders,
|
||||||
|
created_at: now,
|
||||||
|
updated_at: now,
|
||||||
|
...(expiresAt !== undefined ? { expires_at: expiresAt } : {}),
|
||||||
|
};
|
||||||
|
return new Feed(id, config, { emails: [] });
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Rebuild an aggregate from persisted state. */
|
||||||
|
static reconstitute(
|
||||||
|
id: string,
|
||||||
|
config: FeedConfig,
|
||||||
|
metadata: FeedMetadata,
|
||||||
|
): Feed {
|
||||||
|
return new Feed(id, config, metadata);
|
||||||
|
}
|
||||||
|
|
||||||
|
get config(): Readonly<FeedConfig> {
|
||||||
|
return this._config;
|
||||||
|
}
|
||||||
|
|
||||||
|
get metadata(): Readonly<FeedMetadata> {
|
||||||
|
return this._metadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
isExpired(now: number = Date.now()): boolean {
|
||||||
|
return isExpired(this._config, now);
|
||||||
|
}
|
||||||
|
|
||||||
|
accepts(senders: string[]): SenderDecision {
|
||||||
|
return applySenderPolicy(this._config, senders);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an email to the front of the index, refresh the icon domain and the
|
||||||
|
* per-sender unsubscribe link, then trim the oldest entries back under the
|
||||||
|
* byte budget. Returns the dropped entries so the caller can purge their
|
||||||
|
* bodies/attachments.
|
||||||
|
*/
|
||||||
|
ingest(
|
||||||
|
entry: EmailMetadata,
|
||||||
|
opts: IngestOptions,
|
||||||
|
): { dropped: EmailMetadata[] } {
|
||||||
|
this._metadata.emails.unshift(entry);
|
||||||
|
|
||||||
|
if (opts.iconDomain) {
|
||||||
|
this._metadata.iconDomain = opts.iconDomain;
|
||||||
|
}
|
||||||
|
if (opts.unsub) {
|
||||||
|
this._metadata.unsubscribe = {
|
||||||
|
...(this._metadata.unsubscribe ?? {}),
|
||||||
|
[opts.unsub.senderKey]: opts.unsub.url,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return trimToByteBudget(this._metadata, opts.maxBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Drop the given email keys from the index. Returns the removed entries so the
|
||||||
|
* caller can purge their bodies/attachments.
|
||||||
|
*/
|
||||||
|
removeEmails(keys: string[]): { removed: EmailMetadata[] } {
|
||||||
|
const target = new Set(keys);
|
||||||
|
const removed: EmailMetadata[] = [];
|
||||||
|
const kept: EmailMetadata[] = [];
|
||||||
|
for (const entry of this._metadata.emails) {
|
||||||
|
(target.has(entry.key) ? removed : kept).push(entry);
|
||||||
|
}
|
||||||
|
this._metadata.emails = kept;
|
||||||
|
return { removed };
|
||||||
|
}
|
||||||
|
}
|
||||||
+48
-58
@@ -9,7 +9,7 @@ import {
|
|||||||
import { parseOneClickUnsubscribe } from "../utils/unsubscribe";
|
import { parseOneClickUnsubscribe } from "../utils/unsubscribe";
|
||||||
import { getAttachmentBucket } from "../utils/attachments";
|
import { getAttachmentBucket } from "../utils/attachments";
|
||||||
import { FeedRepository } from "../domain/feed-repository";
|
import { FeedRepository } from "../domain/feed-repository";
|
||||||
import { isExpired, applySenderPolicy, trimToByteBudget } from "../domain/feed";
|
import { Feed } from "../domain/feed.aggregate";
|
||||||
import { logger } from "./logger";
|
import { logger } from "./logger";
|
||||||
import { FEED_MAX_BYTES } from "../config/constants";
|
import { FEED_MAX_BYTES } from "../config/constants";
|
||||||
|
|
||||||
@@ -70,10 +70,12 @@ async function uploadAttachments(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function validateEmail(
|
async function loadAcceptingFeed(
|
||||||
input: ProcessEmailInput,
|
input: ProcessEmailInput,
|
||||||
env: Env,
|
env: Env,
|
||||||
): Promise<IngestResult> {
|
): Promise<
|
||||||
|
{ ok: true; feed: Feed } | { ok: false; reason: IngestRejectionReason }
|
||||||
|
> {
|
||||||
const feedId = EmailParser.extractFeedId(input.toAddress);
|
const feedId = EmailParser.extractFeedId(input.toAddress);
|
||||||
if (!feedId) {
|
if (!feedId) {
|
||||||
logger.error("Invalid email address format", {
|
logger.error("Invalid email address format", {
|
||||||
@@ -82,31 +84,30 @@ export async function validateEmail(
|
|||||||
return { ok: false, reason: "invalid_address" };
|
return { ok: false, reason: "invalid_address" };
|
||||||
}
|
}
|
||||||
|
|
||||||
const feedConfig = await FeedRepository.from(env).getConfig(feedId);
|
const feed = await FeedRepository.from(env).load(feedId);
|
||||||
if (!feedConfig) {
|
if (!feed) {
|
||||||
logger.error("Feed not found", { feedId });
|
logger.error("Feed not found", { feedId });
|
||||||
return { ok: false, reason: "feed_not_found" };
|
return { ok: false, reason: "feed_not_found" };
|
||||||
}
|
}
|
||||||
if (isExpired(feedConfig)) {
|
if (feed.isExpired()) {
|
||||||
logger.warn("Rejected email: feed expired", { feedId });
|
logger.warn("Rejected email: feed expired", { feedId });
|
||||||
return { ok: false, reason: "feed_expired" };
|
return { ok: false, reason: "feed_expired" };
|
||||||
}
|
}
|
||||||
|
if (feed.accepts(input.senders) === "blocked") {
|
||||||
if (applySenderPolicy(feedConfig, input.senders) === "blocked") {
|
|
||||||
logger.warn("Rejected email: sender filter", {
|
logger.warn("Rejected email: sender filter", {
|
||||||
feedId,
|
feedId,
|
||||||
senders: input.senders,
|
senders: input.senders,
|
||||||
allowedSenders: feedConfig.allowed_senders,
|
allowedSenders: feed.config.allowed_senders,
|
||||||
blockedSenders: feedConfig.blocked_senders,
|
blockedSenders: feed.config.blocked_senders,
|
||||||
});
|
});
|
||||||
return { ok: false, reason: "sender_blocked" };
|
return { ok: false, reason: "sender_blocked" };
|
||||||
}
|
}
|
||||||
|
|
||||||
return { ok: true, feedId };
|
return { ok: true, feed };
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function storeEmail(
|
async function storeEmail(
|
||||||
feedId: string,
|
feed: Feed,
|
||||||
input: ProcessEmailInput,
|
input: ProcessEmailInput,
|
||||||
env: Env,
|
env: Env,
|
||||||
ctx?: ExecutionContext,
|
ctx?: ExecutionContext,
|
||||||
@@ -127,26 +128,12 @@ export async function storeEmail(
|
|||||||
};
|
};
|
||||||
|
|
||||||
const repo = FeedRepository.from(env);
|
const repo = FeedRepository.from(env);
|
||||||
const emailKey = repo.newEmailKey(feedId);
|
const emailKey = repo.newEmailKey(feed.id);
|
||||||
|
await repo.putEmail(emailKey, emailData);
|
||||||
|
|
||||||
const [, rawMetadata] = await Promise.all([
|
const serialisedSize = new TextEncoder().encode(
|
||||||
repo.putEmail(emailKey, emailData),
|
JSON.stringify(emailData),
|
||||||
repo.getMetadata(feedId),
|
).byteLength;
|
||||||
]);
|
|
||||||
|
|
||||||
// Note: KV has no atomic compare-and-swap. Concurrent invocations for the
|
|
||||||
// same feed can read stale metadata and produce orphaned KV entries or
|
|
||||||
// duplicate trim deletions. This is an accepted limitation given Cloudflare
|
|
||||||
// KV's eventual-consistency model.
|
|
||||||
// TODO: Migrate feed metadata writes to Cloudflare Durable Objects to serialise
|
|
||||||
// concurrent writes and eliminate this race condition.
|
|
||||||
const feedMetadata = rawMetadata || { emails: [] };
|
|
||||||
|
|
||||||
const maxBytes =
|
|
||||||
parseInt(env.FEED_MAX_SIZE_BYTES ?? "", 10) || FEED_MAX_BYTES;
|
|
||||||
|
|
||||||
const serialised = JSON.stringify(emailData);
|
|
||||||
const serialisedSize = new TextEncoder().encode(serialised).byteLength;
|
|
||||||
const newEntry: EmailMetadata = {
|
const newEntry: EmailMetadata = {
|
||||||
key: emailKey,
|
key: emailKey,
|
||||||
subject: emailData.subject,
|
subject: emailData.subject,
|
||||||
@@ -156,45 +143,48 @@ export async function storeEmail(
|
|||||||
? { attachmentIds: storedAttachments.map((a) => a.id) }
|
? { attachmentIds: storedAttachments.map((a) => a.id) }
|
||||||
: {}),
|
: {}),
|
||||||
};
|
};
|
||||||
feedMetadata.emails.unshift(newEntry);
|
|
||||||
|
|
||||||
// Track the latest sender's domain so the feed icon follows the source.
|
// Track the latest sender's domain (feed icon) and capture the RFC 8058
|
||||||
|
// one-click unsubscribe link, keyed by sender so each newsletter keeps its
|
||||||
|
// own latest URL (fired when the feed is deleted).
|
||||||
const iconDomain = extractEmailDomain(input.from);
|
const iconDomain = extractEmailDomain(input.from);
|
||||||
if (iconDomain) {
|
|
||||||
feedMetadata.iconDomain = iconDomain;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Capture the sender's RFC 8058 one-click unsubscribe link so we can stop the
|
|
||||||
// newsletter when the feed is deleted. Keyed by sender: each newsletter on the
|
|
||||||
// feed keeps its own entry, and a repeat send overwrites with the latest URL.
|
|
||||||
const unsubUrl = parseOneClickUnsubscribe(input.headers ?? {});
|
const unsubUrl = parseOneClickUnsubscribe(input.headers ?? {});
|
||||||
if (unsubUrl) {
|
const unsub = unsubUrl
|
||||||
const senderKey =
|
? {
|
||||||
input.senders[0] || extractEmailDomain(input.from) || input.from;
|
senderKey: input.senders[0] || iconDomain || input.from,
|
||||||
feedMetadata.unsubscribe = {
|
url: unsubUrl,
|
||||||
...(feedMetadata.unsubscribe ?? {}),
|
|
||||||
[senderKey]: unsubUrl,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
: undefined;
|
||||||
|
|
||||||
const { dropped: toDelete } = trimToByteBudget(feedMetadata, maxBytes);
|
const maxBytes =
|
||||||
|
parseInt(env.FEED_MAX_SIZE_BYTES ?? "", 10) || FEED_MAX_BYTES;
|
||||||
|
|
||||||
|
const { dropped } = feed.ingest(newEntry, {
|
||||||
|
maxBytes,
|
||||||
|
iconDomain: iconDomain ?? undefined,
|
||||||
|
unsub,
|
||||||
|
});
|
||||||
|
|
||||||
const r2Deletions =
|
const r2Deletions =
|
||||||
attachmentBucket && toDelete.length > 0
|
attachmentBucket && dropped.length > 0
|
||||||
? toDelete
|
? dropped
|
||||||
.flatMap((e) => e.attachmentIds ?? [])
|
.flatMap((e) => e.attachmentIds ?? [])
|
||||||
.map((id) => attachmentBucket.delete(id))
|
.map((id) => attachmentBucket.delete(id))
|
||||||
: [];
|
: [];
|
||||||
|
|
||||||
|
// KV has no compare-and-swap: the load (in loadAcceptingFeed) and this write
|
||||||
|
// are not serialised, so concurrent ingests for one feed can lose updates.
|
||||||
|
// Accepted under KV's eventual-consistency model; the Feed aggregate is the
|
||||||
|
// seam a Durable Object would later wrap to serialise these writers.
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
repo.putMetadata(feedId, feedMetadata),
|
repo.saveMetadata(feed),
|
||||||
...toDelete.map((e) => repo.deleteEmail(e.key)),
|
...dropped.map((e) => repo.deleteEmail(e.key)),
|
||||||
...r2Deletions,
|
...r2Deletions,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
logger.info("Email processed", { feedId });
|
logger.info("Email processed", { feedId: feed.id });
|
||||||
if (ctx) {
|
if (ctx) {
|
||||||
ctx.waitUntil(notifySubscribers(feedId, env));
|
ctx.waitUntil(notifySubscribers(feed.id, env));
|
||||||
if (iconDomain) {
|
if (iconDomain) {
|
||||||
ctx.waitUntil(cacheFaviconForDomain(iconDomain, env));
|
ctx.waitUntil(cacheFaviconForDomain(iconDomain, env));
|
||||||
}
|
}
|
||||||
@@ -206,16 +196,16 @@ export async function processEmail(
|
|||||||
env: Env,
|
env: Env,
|
||||||
ctx?: ExecutionContext,
|
ctx?: ExecutionContext,
|
||||||
): Promise<IngestResult> {
|
): Promise<IngestResult> {
|
||||||
const validation = await validateEmail(input, env);
|
const validation = await loadAcceptingFeed(input, env);
|
||||||
if (!validation.ok) {
|
if (!validation.ok) {
|
||||||
await bumpCounters(env.EMAIL_STORAGE, { emails_rejected: 1 });
|
await bumpCounters(env.EMAIL_STORAGE, { emails_rejected: 1 });
|
||||||
return validation;
|
return validation;
|
||||||
}
|
}
|
||||||
|
|
||||||
await storeEmail(validation.feedId, input, env, ctx);
|
await storeEmail(validation.feed, input, env, ctx);
|
||||||
await bumpCounters(env.EMAIL_STORAGE, {
|
await bumpCounters(env.EMAIL_STORAGE, {
|
||||||
emails_received: 1,
|
emails_received: 1,
|
||||||
last_email_at: new Date().toISOString(),
|
last_email_at: new Date().toISOString(),
|
||||||
});
|
});
|
||||||
return validation;
|
return { ok: true, feedId: validation.feed.id };
|
||||||
}
|
}
|
||||||
|
|||||||
+16
-40
@@ -1,5 +1,5 @@
|
|||||||
import { Context } from "hono";
|
import { Context } from "hono";
|
||||||
import { Env, FeedConfig, FeedMetadata } from "../types";
|
import { Env, FeedConfig } from "../types";
|
||||||
import { generateFeedId } from "../utils/id-generator";
|
import { generateFeedId } from "../utils/id-generator";
|
||||||
import { bumpCounters } from "../utils/stats";
|
import { bumpCounters } from "../utils/stats";
|
||||||
import { waitUntilSafe } from "../utils/worker";
|
import { waitUntilSafe } from "../utils/worker";
|
||||||
@@ -7,19 +7,17 @@ import { sendUnsubscribes } from "../utils/unsubscribe";
|
|||||||
import { getAttachmentBucket } from "../utils/attachments";
|
import { getAttachmentBucket } from "../utils/attachments";
|
||||||
import { FeedRepository } from "../domain/feed-repository";
|
import { FeedRepository } from "../domain/feed-repository";
|
||||||
import { resolveExpiresAt, isExpired } from "../domain/feed";
|
import { resolveExpiresAt, isExpired } from "../domain/feed";
|
||||||
|
import {
|
||||||
|
Feed,
|
||||||
|
CreateFeedInput,
|
||||||
|
UpdateFeedInput,
|
||||||
|
} from "../domain/feed.aggregate";
|
||||||
import {
|
import {
|
||||||
purgeFeedKeysStep,
|
purgeFeedKeysStep,
|
||||||
collectUnsubscribeUrls,
|
collectUnsubscribeUrls,
|
||||||
} from "../routes/admin/helpers";
|
} from "../routes/admin/helpers";
|
||||||
|
|
||||||
export interface CreateFeedInput {
|
export type { CreateFeedInput, UpdateFeedInput };
|
||||||
title: string;
|
|
||||||
description?: string;
|
|
||||||
language: string;
|
|
||||||
allowedSenders: string[];
|
|
||||||
blockedSenders: string[];
|
|
||||||
lifetimeHours?: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a feed: write its config + empty metadata, register it in the global
|
* Create a feed: write its config + empty metadata, register it in the global
|
||||||
@@ -30,44 +28,22 @@ export async function createFeedRecord(
|
|||||||
input: CreateFeedInput,
|
input: CreateFeedInput,
|
||||||
): Promise<{ feedId: string; config: FeedConfig }> {
|
): Promise<{ feedId: string; config: FeedConfig }> {
|
||||||
const repo = FeedRepository.from(env);
|
const repo = FeedRepository.from(env);
|
||||||
const expiresAt = resolveExpiresAt(env, input.lifetimeHours);
|
const feed = Feed.create(generateFeedId(), input, env);
|
||||||
const feedId = generateFeedId();
|
|
||||||
|
|
||||||
const config: FeedConfig = {
|
await repo.save(feed);
|
||||||
title: input.title,
|
await repo.addToList(
|
||||||
description: input.description,
|
feed.id,
|
||||||
language: input.language,
|
feed.config.title,
|
||||||
allowed_senders: input.allowedSenders,
|
feed.config.description,
|
||||||
blocked_senders: input.blockedSenders,
|
feed.config.expires_at,
|
||||||
created_at: Date.now(),
|
);
|
||||||
updated_at: Date.now(),
|
|
||||||
...(expiresAt !== undefined ? { expires_at: expiresAt } : {}),
|
|
||||||
};
|
|
||||||
|
|
||||||
const metadata: FeedMetadata = { emails: [] };
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
repo.putConfig(feedId, config),
|
|
||||||
repo.putMetadata(feedId, metadata),
|
|
||||||
]);
|
|
||||||
|
|
||||||
await repo.addToList(feedId, input.title, input.description, expiresAt);
|
|
||||||
|
|
||||||
await bumpCounters(env.EMAIL_STORAGE, {
|
await bumpCounters(env.EMAIL_STORAGE, {
|
||||||
feeds_created: 1,
|
feeds_created: 1,
|
||||||
last_feed_created_at: new Date().toISOString(),
|
last_feed_created_at: new Date().toISOString(),
|
||||||
});
|
});
|
||||||
|
|
||||||
return { feedId, config };
|
return { feedId: feed.id, config: feed.config };
|
||||||
}
|
|
||||||
|
|
||||||
export interface UpdateFeedInput {
|
|
||||||
title?: string;
|
|
||||||
description?: string;
|
|
||||||
language?: string;
|
|
||||||
allowedSenders?: string[];
|
|
||||||
blockedSenders?: string[];
|
|
||||||
lifetimeHours?: number;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export type UpdateFeedResult =
|
export type UpdateFeedResult =
|
||||||
|
|||||||
@@ -815,6 +815,10 @@ describe("Admin Routes", () => {
|
|||||||
"feeds:list",
|
"feeds:list",
|
||||||
JSON.stringify({ feeds: [{ id: feedId, title: "F" }] }),
|
JSON.stringify({ feeds: [{ id: feedId, title: "F" }] }),
|
||||||
);
|
);
|
||||||
|
await r2Env.EMAIL_STORAGE.put(
|
||||||
|
`feed:${feedId}:config`,
|
||||||
|
JSON.stringify({ title: "F", language: "en", created_at: 1 }),
|
||||||
|
);
|
||||||
const emailKey = `feed:${feedId}:1`;
|
const emailKey = `feed:${feedId}:1`;
|
||||||
await r2Env.EMAIL_STORAGE.put(
|
await r2Env.EMAIL_STORAGE.put(
|
||||||
emailKey,
|
emailKey,
|
||||||
|
|||||||
+14
-25
@@ -650,18 +650,13 @@ emailsRouter.post("/emails/:emailKey/delete", async (c) => {
|
|||||||
return c.text("Feed ID is required", 400);
|
return c.text("Feed ID is required", 400);
|
||||||
}
|
}
|
||||||
|
|
||||||
const feedMetadata = await repo.getMetadata(feedId);
|
const feed = await repo.load(feedId);
|
||||||
|
|
||||||
await repo.deleteEmail(emailKey);
|
await repo.deleteEmail(emailKey);
|
||||||
await deleteAttachmentsForEmails(env, feedMetadata?.emails ?? [], [
|
if (feed) {
|
||||||
emailKey,
|
const { removed } = feed.removeEmails([emailKey]);
|
||||||
]);
|
await deleteAttachmentsForEmails(env, removed, [emailKey]);
|
||||||
|
await repo.saveMetadata(feed);
|
||||||
if (feedMetadata) {
|
|
||||||
feedMetadata.emails = feedMetadata.emails.filter(
|
|
||||||
(email) => email.key !== emailKey,
|
|
||||||
);
|
|
||||||
await repo.putMetadata(feedId, feedMetadata);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wantsJson) return c.json({ ok: true, emailKey, feedId });
|
if (wantsJson) return c.json({ ok: true, emailKey, feedId });
|
||||||
@@ -690,15 +685,15 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => {
|
|||||||
(c.req.header("Accept") || "").includes("application/json");
|
(c.req.header("Accept") || "").includes("application/json");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const feedMetadata = await repo.getMetadata(feedId);
|
const feed = await repo.load(feedId);
|
||||||
|
|
||||||
if (!feedMetadata) {
|
if (!feed) {
|
||||||
return wantsJson
|
return wantsJson
|
||||||
? c.json({ ok: false, error: "Feed not found" }, 404)
|
? c.json({ ok: false, error: "Feed not found" }, 404)
|
||||||
: c.text("Feed not found", 404);
|
: c.text("Feed not found", 404);
|
||||||
}
|
}
|
||||||
|
|
||||||
const allowedKeys = new Set(feedMetadata.emails.map((email) => email.key));
|
const allowedKeys = new Set(feed.metadata.emails.map((email) => email.key));
|
||||||
|
|
||||||
if (wantsJson) {
|
if (wantsJson) {
|
||||||
const body = (await c.req.json().catch(() => null)) as {
|
const body = (await c.req.json().catch(() => null)) as {
|
||||||
@@ -728,13 +723,10 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => {
|
|||||||
|
|
||||||
const { ok: deletedOk, failed: failedEmailKeys } =
|
const { ok: deletedOk, failed: failedEmailKeys } =
|
||||||
await deleteKeysWithConcurrency(emailStorage, candidates, 35);
|
await deleteKeysWithConcurrency(emailStorage, candidates, 35);
|
||||||
await deleteAttachmentsForEmails(env, feedMetadata.emails, candidates);
|
await deleteAttachmentsForEmails(env, feed.metadata.emails, candidates);
|
||||||
|
|
||||||
const deletedSet = new Set(deletedOk);
|
feed.removeEmails(deletedOk);
|
||||||
feedMetadata.emails = feedMetadata.emails.filter(
|
await repo.saveMetadata(feed);
|
||||||
(email) => !deletedSet.has(email.key),
|
|
||||||
);
|
|
||||||
await repo.putMetadata(feedId, feedMetadata);
|
|
||||||
|
|
||||||
return c.json({
|
return c.json({
|
||||||
ok: failedEmailKeys.length === 0,
|
ok: failedEmailKeys.length === 0,
|
||||||
@@ -759,13 +751,10 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => {
|
|||||||
candidates,
|
candidates,
|
||||||
35,
|
35,
|
||||||
);
|
);
|
||||||
await deleteAttachmentsForEmails(env, feedMetadata.emails, candidates);
|
await deleteAttachmentsForEmails(env, feed.metadata.emails, candidates);
|
||||||
|
|
||||||
const deletedSet = new Set(deletedOk);
|
feed.removeEmails(deletedOk);
|
||||||
feedMetadata.emails = feedMetadata.emails.filter(
|
await repo.saveMetadata(feed);
|
||||||
(email) => !deletedSet.has(email.key),
|
|
||||||
);
|
|
||||||
await repo.putMetadata(feedId, feedMetadata);
|
|
||||||
|
|
||||||
return c.redirect(
|
return c.redirect(
|
||||||
`/admin/feeds/${feedId}/emails?message=bulkDeleted&count=${deletedOk.length}`,
|
`/admin/feeds/${feedId}/emails?message=bulkDeleted&count=${deletedOk.length}`,
|
||||||
|
|||||||
@@ -345,15 +345,16 @@ apiApp.openapi(
|
|||||||
const repo = FeedRepository.from(env);
|
const repo = FeedRepository.from(env);
|
||||||
const { feedId, entryId } = c.req.valid("param");
|
const { feedId, entryId } = c.req.valid("param");
|
||||||
const receivedAt = parseInt(entryId, 10);
|
const receivedAt = parseInt(entryId, 10);
|
||||||
const metadata = await repo.getMetadata(feedId);
|
const feed = await repo.load(feedId);
|
||||||
const metaEntry = metadata?.emails.find((e) => e.receivedAt === receivedAt);
|
const metaEntry = feed?.metadata.emails.find(
|
||||||
if (!metadata || !metaEntry)
|
(e) => e.receivedAt === receivedAt,
|
||||||
return c.json({ error: "Email not found" }, 404);
|
);
|
||||||
|
if (!feed || !metaEntry) return c.json({ error: "Email not found" }, 404);
|
||||||
|
|
||||||
await repo.deleteEmail(metaEntry.key);
|
await repo.deleteEmail(metaEntry.key);
|
||||||
await deleteAttachmentsForEmails(env, metadata.emails, [metaEntry.key]);
|
const { removed } = feed.removeEmails([metaEntry.key]);
|
||||||
metadata.emails = metadata.emails.filter((e) => e.key !== metaEntry.key);
|
await deleteAttachmentsForEmails(env, removed, [metaEntry.key]);
|
||||||
await repo.putMetadata(feedId, metadata);
|
await repo.saveMetadata(feed);
|
||||||
|
|
||||||
return c.json({ ok: true }, 200);
|
return c.json({ ok: true }, 200);
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user