mirror of
https://github.com/juherr/kill-the-news.git
synced 2026-06-20 22:03:48 +00:00
refactor(ingest): return a domain result from processEmail, map HTTP at the edge
processEmail/validateEmail now return an IngestResult discriminated union
({ ok } | { ok: false; reason }) instead of an HTTP Response. The status mapping
moves to the edge (ingestResultToResponse in forwardemail.ts), and the Cloudflare
email handler now logs the rejection reason instead of silently discarding it.
The ingestion core is transport-agnostic. End-to-end status mapping stays covered
by inbound.test.ts (now incl. 410 expired); email-processor.test asserts on the
domain result directly.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,7 @@ import PostalMime from "postal-mime";
|
||||
import { Env } from "../types";
|
||||
import { processEmail, RawAttachment } from "./email-processor";
|
||||
import { normalizeCid } from "../utils/html-processor";
|
||||
import { logger } from "./logger";
|
||||
|
||||
export async function handleCloudflareEmail(
|
||||
message: ForwardableEmailMessage,
|
||||
@@ -31,7 +32,7 @@ export async function handleCloudflareEmail(
|
||||
contentId: normalizeCid(a.contentId),
|
||||
}));
|
||||
|
||||
await processEmail(
|
||||
const result = await processEmail(
|
||||
{
|
||||
toAddress: message.to,
|
||||
from,
|
||||
@@ -45,6 +46,12 @@ export async function handleCloudflareEmail(
|
||||
env,
|
||||
ctx,
|
||||
);
|
||||
if (!result.ok) {
|
||||
logger.warn("Inbound email rejected", {
|
||||
to: message.to,
|
||||
reason: result.reason,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error processing Cloudflare email:", error);
|
||||
}
|
||||
|
||||
@@ -39,12 +39,12 @@ describe("processEmail", () => {
|
||||
makeInput({ toAddress: "invalid@domain.com" }),
|
||||
env as any,
|
||||
);
|
||||
expect(res.status).toBe(400);
|
||||
expect(res).toMatchObject({ ok: false, reason: "invalid_address" });
|
||||
});
|
||||
|
||||
it("returns 404 when feed does not exist", async () => {
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(404);
|
||||
expect(res).toMatchObject({ ok: false, reason: "feed_not_found" });
|
||||
});
|
||||
|
||||
it("returns 403 when sender is not in allowlist", async () => {
|
||||
@@ -56,7 +56,7 @@ describe("processEmail", () => {
|
||||
makeInput({ senders: ["other@example.com"] }),
|
||||
env as any,
|
||||
);
|
||||
expect(res.status).toBe(403);
|
||||
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
|
||||
});
|
||||
|
||||
it("returns 200 and stores email when sender is allowed by exact match", async () => {
|
||||
@@ -65,7 +65,7 @@ describe("processEmail", () => {
|
||||
JSON.stringify({ allowed_senders: ["sender@example.com"] }),
|
||||
);
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("returns 200 and stores email when sender matches by domain", async () => {
|
||||
@@ -77,7 +77,7 @@ describe("processEmail", () => {
|
||||
makeInput({ senders: ["anyone@example.com"] }),
|
||||
env as any,
|
||||
);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("returns 200 when no allowlist is set", async () => {
|
||||
@@ -86,7 +86,7 @@ describe("processEmail", () => {
|
||||
JSON.stringify({ allowed_senders: [] }),
|
||||
);
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("returns 403 when sender is in blocklist by exact address", async () => {
|
||||
@@ -95,7 +95,7 @@ describe("processEmail", () => {
|
||||
JSON.stringify({ blocked_senders: ["sender@example.com"] }),
|
||||
);
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(403);
|
||||
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
|
||||
});
|
||||
|
||||
it("returns 403 when sender is in blocklist by domain", async () => {
|
||||
@@ -104,7 +104,7 @@ describe("processEmail", () => {
|
||||
JSON.stringify({ blocked_senders: ["example.com"] }),
|
||||
);
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(403);
|
||||
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
|
||||
});
|
||||
|
||||
it("returns 200 when sender is not in blocklist", async () => {
|
||||
@@ -113,7 +113,7 @@ describe("processEmail", () => {
|
||||
JSON.stringify({ blocked_senders: ["other@example.com"] }),
|
||||
);
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("exact block takes precedence over domain allow", async () => {
|
||||
@@ -125,7 +125,7 @@ describe("processEmail", () => {
|
||||
}),
|
||||
);
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(403);
|
||||
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
|
||||
});
|
||||
|
||||
it("exact allow overrides domain block (exception use case)", async () => {
|
||||
@@ -137,7 +137,7 @@ describe("processEmail", () => {
|
||||
}),
|
||||
);
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("exact block takes precedence over exact allow", async () => {
|
||||
@@ -149,7 +149,7 @@ describe("processEmail", () => {
|
||||
}),
|
||||
);
|
||||
const res = await processEmail(makeInput(), env as any);
|
||||
expect(res.status).toBe(403);
|
||||
expect(res).toMatchObject({ ok: false, reason: "sender_blocked" });
|
||||
});
|
||||
|
||||
it("stores email data and updates metadata in KV", async () => {
|
||||
@@ -250,7 +250,7 @@ describe("processEmail", () => {
|
||||
makeInput({ subject: "New" }),
|
||||
tinyEnv as any,
|
||||
);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const metadata = await env.EMAIL_STORAGE.get(
|
||||
`feed:${VALID_FEED_ID}:metadata`,
|
||||
@@ -305,7 +305,7 @@ describe("processEmail", () => {
|
||||
|
||||
const res = await processEmail(makeInput(), env as any, ctx);
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
expect(waitUntilCalled).toBe(true);
|
||||
});
|
||||
|
||||
@@ -326,7 +326,7 @@ describe("processEmail", () => {
|
||||
ctx,
|
||||
);
|
||||
|
||||
expect(res.status).toBe(404);
|
||||
expect(res).toMatchObject({ ok: false, reason: "feed_not_found" });
|
||||
expect(waitUntilCalled).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -351,7 +351,7 @@ describe("processEmail — attachments", () => {
|
||||
makeInput({ attachments: [pdfAttachment] }),
|
||||
env as any,
|
||||
);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const metadata = await env.EMAIL_STORAGE.get(
|
||||
`feed:${VALID_FEED_ID}:metadata`,
|
||||
@@ -376,7 +376,7 @@ describe("processEmail — attachments", () => {
|
||||
makeInput({ attachments: [pdfAttachment] }),
|
||||
env as any,
|
||||
);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const metadata = await env.EMAIL_STORAGE.get(
|
||||
`feed:${VALID_FEED_ID}:metadata`,
|
||||
@@ -401,7 +401,7 @@ describe("processEmail — attachments", () => {
|
||||
makeInput({ attachments: [pdfAttachment] }),
|
||||
env as any,
|
||||
);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const metadata = await env.EMAIL_STORAGE.get(
|
||||
`feed:${VALID_FEED_ID}:metadata`,
|
||||
@@ -490,7 +490,7 @@ describe("processEmail — attachments", () => {
|
||||
makeInput({ subject: "New" }),
|
||||
tinyEnv as any,
|
||||
);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
// Old attachment should be deleted from R2
|
||||
expect(mockR2._has(oldAttachmentId)).toBe(false);
|
||||
|
||||
+24
-27
@@ -1,5 +1,5 @@
|
||||
import { EmailParser } from "../utils/email-parser";
|
||||
import { AttachmentData, EmailMetadata, Env, FeedConfig } from "../types";
|
||||
import { AttachmentData, EmailMetadata, Env } from "../types";
|
||||
import { notifySubscribers } from "../utils/websub";
|
||||
import { bumpCounters } from "../utils/stats";
|
||||
import {
|
||||
@@ -31,9 +31,20 @@ export interface ProcessEmailInput {
|
||||
attachments?: RawAttachment[];
|
||||
}
|
||||
|
||||
type ValidationSuccess = { ok: true; feedId: string; feedConfig: FeedConfig };
|
||||
type ValidationFailure = { ok: false; response: Response };
|
||||
type ValidationResult = ValidationSuccess | ValidationFailure;
|
||||
export type IngestRejectionReason =
|
||||
| "invalid_address"
|
||||
| "feed_not_found"
|
||||
| "feed_expired"
|
||||
| "sender_blocked";
|
||||
|
||||
/**
|
||||
* Outcome of ingesting an email — a domain result, not an HTTP concern. The edge
|
||||
* (forwardemail.ts) maps this to a status code; the Cloudflare email handler
|
||||
* logs the reason. Keeping HTTP out of the core keeps ingestion transport-agnostic.
|
||||
*/
|
||||
export type IngestResult =
|
||||
| { ok: true; feedId: string }
|
||||
| { ok: false; reason: IngestRejectionReason };
|
||||
|
||||
async function uploadAttachments(
|
||||
attachments: RawAttachment[],
|
||||
@@ -62,32 +73,23 @@ async function uploadAttachments(
|
||||
export async function validateEmail(
|
||||
input: ProcessEmailInput,
|
||||
env: Env,
|
||||
): Promise<ValidationResult> {
|
||||
): Promise<IngestResult> {
|
||||
const feedId = EmailParser.extractFeedId(input.toAddress);
|
||||
if (!feedId) {
|
||||
logger.error("Invalid email address format", {
|
||||
toAddress: input.toAddress,
|
||||
});
|
||||
return {
|
||||
ok: false,
|
||||
response: new Response("Invalid email address format", { status: 400 }),
|
||||
};
|
||||
return { ok: false, reason: "invalid_address" };
|
||||
}
|
||||
|
||||
const feedConfig = await FeedRepository.from(env).getConfig(feedId);
|
||||
if (!feedConfig) {
|
||||
logger.error("Feed not found", { feedId });
|
||||
return {
|
||||
ok: false,
|
||||
response: new Response("Feed does not exist", { status: 404 }),
|
||||
};
|
||||
return { ok: false, reason: "feed_not_found" };
|
||||
}
|
||||
if (isExpired(feedConfig)) {
|
||||
logger.warn("Rejected email: feed expired", { feedId });
|
||||
return {
|
||||
ok: false,
|
||||
response: new Response("Feed has expired", { status: 410 }),
|
||||
};
|
||||
return { ok: false, reason: "feed_expired" };
|
||||
}
|
||||
|
||||
if (applySenderPolicy(feedConfig, input.senders) === "blocked") {
|
||||
@@ -97,15 +99,10 @@ export async function validateEmail(
|
||||
allowedSenders: feedConfig.allowed_senders,
|
||||
blockedSenders: feedConfig.blocked_senders,
|
||||
});
|
||||
return {
|
||||
ok: false,
|
||||
response: new Response("Sender not allowed for this feed", {
|
||||
status: 403,
|
||||
}),
|
||||
};
|
||||
return { ok: false, reason: "sender_blocked" };
|
||||
}
|
||||
|
||||
return { ok: true, feedId, feedConfig };
|
||||
return { ok: true, feedId };
|
||||
}
|
||||
|
||||
export async function storeEmail(
|
||||
@@ -208,11 +205,11 @@ export async function processEmail(
|
||||
input: ProcessEmailInput,
|
||||
env: Env,
|
||||
ctx?: ExecutionContext,
|
||||
): Promise<Response> {
|
||||
): Promise<IngestResult> {
|
||||
const validation = await validateEmail(input, env);
|
||||
if (!validation.ok) {
|
||||
await bumpCounters(env.EMAIL_STORAGE, { emails_rejected: 1 });
|
||||
return validation.response;
|
||||
return validation;
|
||||
}
|
||||
|
||||
await storeEmail(validation.feedId, input, env, ctx);
|
||||
@@ -220,5 +217,5 @@ export async function processEmail(
|
||||
emails_received: 1,
|
||||
last_email_at: new Date().toISOString(),
|
||||
});
|
||||
return new Response("Email processed successfully", { status: 200 });
|
||||
return validation;
|
||||
}
|
||||
|
||||
+20
-2
@@ -1,8 +1,25 @@
|
||||
import { EmailParser } from "../utils/email-parser";
|
||||
import { Env } from "../types";
|
||||
import { processEmail, RawAttachment } from "./email-processor";
|
||||
import { processEmail, IngestResult, RawAttachment } from "./email-processor";
|
||||
import { normalizeCid } from "../utils/html-processor";
|
||||
|
||||
/** Map an ingestion result to the HTTP response ForwardEmail expects. */
|
||||
export function ingestResultToResponse(result: IngestResult): Response {
|
||||
if (result.ok) {
|
||||
return new Response("Email processed successfully", { status: 200 });
|
||||
}
|
||||
switch (result.reason) {
|
||||
case "invalid_address":
|
||||
return new Response("Invalid email address format", { status: 400 });
|
||||
case "feed_not_found":
|
||||
return new Response("Feed does not exist", { status: 404 });
|
||||
case "feed_expired":
|
||||
return new Response("Feed has expired", { status: 410 });
|
||||
case "sender_blocked":
|
||||
return new Response("Sender not allowed for this feed", { status: 403 });
|
||||
}
|
||||
}
|
||||
|
||||
export interface ForwardEmailAttachment {
|
||||
filename?: string;
|
||||
contentType?: string;
|
||||
@@ -88,7 +105,7 @@ export async function handleForwardEmail(
|
||||
})
|
||||
.filter((a): a is RawAttachment => a !== null);
|
||||
|
||||
return processEmail(
|
||||
const result = await processEmail(
|
||||
{
|
||||
toAddress: payload.recipients?.[0] || "",
|
||||
from: emailData.from,
|
||||
@@ -102,4 +119,5 @@ export async function handleForwardEmail(
|
||||
env,
|
||||
ctx,
|
||||
);
|
||||
return ingestResultToResponse(result);
|
||||
}
|
||||
|
||||
@@ -135,6 +135,15 @@ describe("POST /api/inbound — handler logic", () => {
|
||||
expect(res.status).toBe(403);
|
||||
});
|
||||
|
||||
it("returns 410 when the feed has expired", async () => {
|
||||
await env.EMAIL_STORAGE.put(
|
||||
`feed:${VALID_FEED_ID}:config`,
|
||||
JSON.stringify({ expires_at: Date.now() - 1000 }),
|
||||
);
|
||||
const res = await worker.fetch(makeRequest(makePayload()), env);
|
||||
expect(res.status).toBe(410);
|
||||
});
|
||||
|
||||
it("returns 200 when sender matches allowlist by exact address", async () => {
|
||||
await env.EMAIL_STORAGE.put(
|
||||
`feed:${VALID_FEED_ID}:config`,
|
||||
|
||||
Reference in New Issue
Block a user