feat(attachments): R2 toggle, storage metrics, and demo R2 config

Add an ATTACHMENTS_ENABLED switch (default on when R2 is bound) via a
central getAttachmentBucket helper, surface R2 + estimated KV usage
against the free tier on the status page and /api/stats (refreshed by the
hourly cron), let setup.sh create and wire the R2 bucket, and bind the
demo bucket so the deployed demo has attachments.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Julien Herr
2026-05-23 17:33:50 +02:00
parent 7226e718f7
commit f150d40c45
19 changed files with 387 additions and 23 deletions
+6
View File
@@ -1,6 +1,12 @@
/** Maximum total size of emails stored per feed (bytes). */
export const FEED_MAX_BYTES = 524288; // 512 KB
/** Cloudflare R2 free tier storage allowance (bytes). */
export const R2_FREE_TIER_BYTES = 10 * 1024 ** 3; // 10 GB
/** Cloudflare KV free tier storage allowance (bytes). */
export const KV_FREE_TIER_BYTES = 1 * 1024 ** 3; // 1 GB
/** Cache TTL for ForwardEmail.net IP list (milliseconds). */
export const FORWARD_EMAIL_IPS_CACHE_TTL_MS = 24 * 60 * 60 * 1000; // 24 hours
+26 -2
View File
@@ -18,7 +18,13 @@ import {
purgeExpiredFeeds,
removeFeedsFromListBulk,
} from "./routes/admin/helpers";
import { bumpCounters } from "./utils/stats";
import {
bumpCounters,
scanR2Usage,
scanKvUsage,
setStorageSnapshot,
} from "./utils/stats";
import { getAttachmentBucket } from "./utils/attachments";
import { FORWARD_EMAIL_IPS_CACHE_TTL_MS } from "./config/constants";
type AppEnv = { Bindings: Env };
@@ -196,6 +202,7 @@ export default {
await handleCloudflareEmail(message, env, ctx);
},
async scheduled(_event: ScheduledEvent, env: Env, _ctx: ExecutionContext) {
const attachmentBucket = getAttachmentBucket(env);
const feeds = await listAllFeeds(env.EMAIL_STORAGE);
const now = Date.now();
const expiredIds = feeds
@@ -203,7 +210,7 @@ export default {
.map((f) => f.id);
for (const feedId of expiredIds) {
await purgeExpiredFeeds(env.EMAIL_STORAGE, feedId, env.ATTACHMENT_BUCKET);
await purgeExpiredFeeds(env.EMAIL_STORAGE, feedId, attachmentBucket);
}
if (expiredIds.length > 0) {
await removeFeedsFromListBulk(env.EMAIL_STORAGE, expiredIds);
@@ -212,5 +219,22 @@ export default {
});
logger.info("Feed TTL cleanup", { deleted: expiredIds.length });
}
// Refresh the cached storage-usage snapshot for the status page / /api/stats.
try {
const r2 = attachmentBucket
? await scanR2Usage(attachmentBucket)
: { bytes: 0, count: 0 };
const kv = await scanKvUsage(env.EMAIL_STORAGE);
await setStorageSnapshot(env.EMAIL_STORAGE, {
attachments_bytes: r2.bytes,
attachments_count: r2.count,
kv_bytes_estimated: kv.bytes,
});
} catch (error) {
logger.error("Error refreshing storage snapshot", {
error: String(error),
});
}
},
};
+26
View File
@@ -363,6 +363,32 @@ describe("processEmail — attachments", () => {
expect(emailData.attachments).toBeUndefined();
});
it("skips R2 upload when ATTACHMENTS_ENABLED is 'false' even with R2 bound", async () => {
const env = createMockEnv({ withR2: true });
(env as any).ATTACHMENTS_ENABLED = "false";
const mockR2 = (env as any).ATTACHMENT_BUCKET as unknown as MockR2;
await env.EMAIL_STORAGE.put(
`feed:${VALID_FEED_ID}:config`,
JSON.stringify({}),
);
const res = await processEmail(
makeInput({ attachments: [pdfAttachment] }),
env as any,
);
expect(res.status).toBe(200);
const metadata = await env.EMAIL_STORAGE.get(
`feed:${VALID_FEED_ID}:metadata`,
"json",
);
const emailData = await env.EMAIL_STORAGE.get(
metadata.emails[0].key,
"json",
);
expect(emailData.attachments).toBeUndefined();
expect((await mockR2.list()).objects).toHaveLength(0);
});
it("uploads attachments to R2 and stores AttachmentData in emailData", async () => {
const env = createMockEnv({ withR2: true });
const mockR2 = (env as any).ATTACHMENT_BUCKET as unknown as MockR2;
+6 -4
View File
@@ -13,6 +13,7 @@ import {
extractEmailDomain,
} from "../utils/favicon-fetcher";
import { parseOneClickUnsubscribe } from "../utils/unsubscribe";
import { getAttachmentBucket } from "../utils/attachments";
import { logger } from "./logger";
import { FEED_MAX_BYTES } from "../config/constants";
@@ -170,9 +171,10 @@ export async function storeEmail(
env: Env,
ctx?: ExecutionContext,
): Promise<void> {
const attachmentBucket = getAttachmentBucket(env);
const storedAttachments: AttachmentData[] =
env.ATTACHMENT_BUCKET && input.attachments?.length
? await uploadAttachments(input.attachments, env.ATTACHMENT_BUCKET)
attachmentBucket && input.attachments?.length
? await uploadAttachments(input.attachments, attachmentBucket)
: [];
const emailData = {
@@ -249,10 +251,10 @@ export async function storeEmail(
}
const r2Deletions =
env.ATTACHMENT_BUCKET && toDelete.length > 0
attachmentBucket && toDelete.length > 0
? toDelete
.flatMap((e) => e.attachmentIds ?? [])
.map((id) => env.ATTACHMENT_BUCKET!.delete(id))
.map((id) => attachmentBucket.delete(id))
: [];
await Promise.all([
+7 -4
View File
@@ -10,6 +10,7 @@ import { logger } from "../../lib/logger";
import { Layout, clampText } from "./ui";
import { deleteKeysWithConcurrency } from "./helpers";
import { feedRssUrl, feedAtomUrl, feedEmailAddress } from "../../utils/urls";
import { getAttachmentBucket } from "../../utils/attachments";
import { emailsPageScript } from "../../scripts/generated/emails-page";
type AppEnv = { Bindings: Env };
@@ -643,9 +644,10 @@ emailsRouter.post("/emails/:emailKey/delete", async (c) => {
await emailStorage.put(feedMetadataKey, JSON.stringify(feedMetadata));
}
if (env.ATTACHMENT_BUCKET && attachmentIds.length > 0) {
const attachmentBucket = getAttachmentBucket(env);
if (attachmentBucket && attachmentIds.length > 0) {
await Promise.allSettled(
attachmentIds.map((id) => env.ATTACHMENT_BUCKET!.delete(id)),
attachmentIds.map((id) => attachmentBucket.delete(id)),
);
}
@@ -726,9 +728,10 @@ emailsRouter.post("/feeds/:feedId/emails/bulk-delete", async (c) => {
);
await emailStorage.put(feedMetadataKey, JSON.stringify(feedMetadata));
if (env.ATTACHMENT_BUCKET && r2AttachmentIds.length > 0) {
const attachmentBucket = getAttachmentBucket(env);
if (attachmentBucket && r2AttachmentIds.length > 0) {
await Promise.allSettled(
r2AttachmentIds.map((id) => env.ATTACHMENT_BUCKET!.delete(id)),
r2AttachmentIds.map((id) => attachmentBucket.delete(id)),
);
}
+3 -2
View File
@@ -7,6 +7,7 @@ import { waitUntilSafe } from "../../utils/worker";
import { feedRssUrl, feedEmailAddress } from "../../utils/urls";
import { logger } from "../../lib/logger";
import { sendUnsubscribes } from "../../utils/unsubscribe";
import { getAttachmentBucket } from "../../utils/attachments";
import { Layout } from "./ui";
import {
addFeedToList,
@@ -555,7 +556,7 @@ feedsRouter.post("/:feedId/delete", async (c) => {
waitUntilSafe(
c,
purgeFeedKeysStep(emailStorage, feedId, {
bucket: env.ATTACHMENT_BUCKET,
bucket: getAttachmentBucket(env),
}),
);
@@ -594,7 +595,7 @@ feedsRouter.post("/:feedId/purge", async (c) => {
const step = await purgeFeedKeysStep(emailStorage, feedId, {
cursor,
limit,
bucket: env.ATTACHMENT_BUCKET,
bucket: getAttachmentBucket(env),
});
return c.json({
+1 -6
View File
@@ -2,12 +2,7 @@ import { Context } from "hono";
import { html, raw } from "hono/html";
import { Env, FeedConfig, FeedMetadata, EmailData } from "../types";
import { processEmailContent } from "../utils/html-processor";
function formatBytes(bytes: number): string {
if (bytes < 1024) return `${bytes} B`;
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
}
import { formatBytes } from "../utils/format";
export async function handle(c: Context<{ Bindings: Env }>): Promise<Response> {
const feedId = c.req.param("feedId");
+4 -2
View File
@@ -1,8 +1,10 @@
import { Context } from "hono";
import { Env } from "../types";
import { getAttachmentBucket } from "../utils/attachments";
export async function handle(c: Context<{ Bindings: Env }>): Promise<Response> {
if (!c.env.ATTACHMENT_BUCKET) {
const bucket = getAttachmentBucket(c.env);
if (!bucket) {
return new Response("Attachment storage not configured", { status: 404 });
}
@@ -13,7 +15,7 @@ export async function handle(c: Context<{ Bindings: Env }>): Promise<Response> {
return new Response("Not found", { status: 404 });
}
const object = await c.env.ATTACHMENT_BUCKET.get(attachmentId);
const object = await bucket.get(attachmentId);
if (!object) {
return new Response("Not found", { status: 404 });
+40
View File
@@ -1,6 +1,8 @@
import { Context } from "hono";
import { Env } from "../types";
import { getStats } from "../utils/stats";
import { formatBytes } from "../utils/format";
import { R2_FREE_TIER_BYTES, KV_FREE_TIER_BYTES } from "../config/constants";
import { Layout } from "./admin/ui";
function formatDateTime(iso?: string): string {
@@ -43,6 +45,11 @@ function formatUptime(iso?: string): string {
return `${days} ${days === 1 ? "day" : "days"}`;
}
function tierPercent(used: number, total: number): number {
if (total <= 0) return 0;
return Math.round((used / total) * 100);
}
type Tone = "success" | "danger";
type StatProps = {
@@ -83,6 +90,11 @@ export async function handle(c: Context<{ Bindings: Env }>): Promise<Response> {
? (stats.emails_received / stats.feeds_created).toFixed(1)
: "—";
const kvBytes = stats.kv_bytes_estimated;
const kvPercent = tierPercent(kvBytes ?? 0, KV_FREE_TIER_BYTES);
const r2Bytes = stats.attachments_bytes;
const r2Percent = tierPercent(r2Bytes ?? 0, R2_FREE_TIER_BYTES);
return c.html(
<Layout title="Status" label="status">
<div class="container fade-in">
@@ -167,6 +179,34 @@ export async function handle(c: Context<{ Bindings: Env }>): Promise<Response> {
</div>
</section>
<section class="stat-section">
<h2 class="stat-section-title">Storage</h2>
<div class="stats-grid">
<Stat
label="KV space used (est.)"
value={kvBytes === undefined ? "—" : formatBytes(kvBytes)}
title={`${kvPercent}% of 1 GB free tier — estimate`}
tone={kvPercent >= 80 ? "danger" : undefined}
/>
{stats.attachments_enabled ? (
<>
<Stat
label="Attachments stored"
value={stats.attachments_count ?? "—"}
/>
<Stat
label="R2 space used"
value={r2Bytes === undefined ? "—" : formatBytes(r2Bytes)}
title={`${r2Percent}% of 10 GB free tier`}
tone={r2Percent >= 80 ? "danger" : undefined}
/>
</>
) : (
<Stat label="Attachments (R2)" value="Off" />
)}
</div>
</section>
<section class="stat-section">
<h2 class="stat-section-title">Instance</h2>
<div class="stats-grid">
+8
View File
@@ -237,6 +237,14 @@ export class MockR2 {
}
}
async list(_options?: { cursor?: string }) {
const objects = Array.from(this.store.entries()).map(([key, entry]) => ({
key,
size: entry.body.byteLength,
}));
return { objects, truncated: false as const, cursor: undefined };
}
_has(key: string) {
return this.store.has(key);
}
+7
View File
@@ -5,6 +5,7 @@ export interface Env {
DOMAIN: string;
EMAIL_DOMAIN?: string;
ATTACHMENT_BUCKET?: R2Bucket;
ATTACHMENTS_ENABLED?: string; // "false" disables attachments even when R2 is bound
FEED_MAX_SIZE_BYTES?: string;
PROXY_TRUSTED_IPS?: string;
PROXY_AUTH_SECRET?: string;
@@ -83,12 +84,18 @@ export interface Counters {
last_email_at?: string; // ISO 8601
last_feed_created_at?: string; // ISO 8601
first_seen?: string; // ISO 8601 — first time counters were written (instance start)
// Storage usage snapshot, refreshed by the hourly cron (overwritten, not incremented).
attachments_bytes?: number; // Total R2 bytes used by attachments
attachments_count?: number; // Number of R2 objects
kv_bytes_estimated?: number; // Estimated KV bytes (sum of stored email sizes)
storage_scanned_at?: string; // ISO 8601 — last storage scan
}
// Monitoring API response: persisted counters + live-computed values
export interface StatsResponse extends Counters {
active_feeds: number;
websub_subscriptions_active: number;
attachments_enabled: boolean;
}
// WebSub (PubSubHubbub) subscription configuration
+9
View File
@@ -0,0 +1,9 @@
import { Env } from "../types";
// Returns the attachment bucket only when the feature is enabled, so callers can
// narrow cleanly. Attachments are on whenever R2 is bound, unless explicitly
// turned off with ATTACHMENTS_ENABLED="false".
export function getAttachmentBucket(env: Env): R2Bucket | undefined {
if (env.ATTACHMENTS_ENABLED === "false") return undefined;
return env.ATTACHMENT_BUCKET;
}
+8
View File
@@ -0,0 +1,8 @@
/** Human-readable byte size (B / KB / MB / GB). */
export function formatBytes(bytes: number): string {
if (bytes < 1024) return `${bytes} B`;
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
if (bytes < 1024 * 1024 * 1024)
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GB`;
}
+96 -1
View File
@@ -1,11 +1,15 @@
import { describe, it, expect } from "vitest";
import { createMockEnv } from "../test/setup";
import { createMockEnv, MockR2 } from "../test/setup";
import {
getCounters,
bumpCounters,
countKeysByPrefix,
getStats,
scanR2Usage,
scanKvUsage,
setStorageSnapshot,
} from "./stats";
import { getAttachmentBucket } from "./attachments";
import { STATS_KEY, FEEDS_LIST_KEY } from "../config/constants";
import { Env } from "../types";
@@ -119,4 +123,95 @@ describe("stats helper", () => {
};
expect(raw.feeds_created).toBe(1);
});
it("getStats reports attachments_enabled based on the toggle", async () => {
const off = createMockEnv() as unknown as Env;
expect((await getStats(off)).attachments_enabled).toBe(false);
const on = createMockEnv({ withR2: true }) as unknown as Env;
expect((await getStats(on)).attachments_enabled).toBe(true);
const disabled = createMockEnv({ withR2: true }) as unknown as Env;
(disabled as any).ATTACHMENTS_ENABLED = "false";
expect((await getStats(disabled)).attachments_enabled).toBe(false);
});
});
describe("getAttachmentBucket", () => {
it("returns the bucket when bound and not disabled", () => {
const env = createMockEnv({ withR2: true }) as unknown as Env;
expect(getAttachmentBucket(env)).toBeDefined();
});
it("returns undefined when no bucket is bound", () => {
const env = createMockEnv() as unknown as Env;
expect(getAttachmentBucket(env)).toBeUndefined();
});
it("returns undefined when explicitly disabled", () => {
const env = createMockEnv({ withR2: true }) as unknown as Env;
(env as any).ATTACHMENTS_ENABLED = "false";
expect(getAttachmentBucket(env)).toBeUndefined();
});
});
describe("storage usage scans", () => {
it("scanR2Usage sums object sizes and counts", async () => {
const bucket = new MockR2();
await bucket.put("a", new Uint8Array(100));
await bucket.put("b", new Uint8Array(250));
const usage = await scanR2Usage(bucket as unknown as R2Bucket);
expect(usage.count).toBe(2);
expect(usage.bytes).toBe(350);
});
it("scanR2Usage returns zeros for an empty bucket", async () => {
const usage = await scanR2Usage(new MockR2() as unknown as R2Bucket);
expect(usage).toEqual({ bytes: 0, count: 0 });
});
it("scanKvUsage estimates KV bytes from stored email sizes", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await kv.put(
FEEDS_LIST_KEY,
JSON.stringify({
feeds: [
{ id: "a", title: "A" },
{ id: "b", title: "B" },
],
}),
);
await kv.put(
"feed:a:metadata",
JSON.stringify({
emails: [
{ key: "k1", size: 100 },
{ key: "k2", size: 50 },
],
}),
);
await kv.put(
"feed:b:metadata",
JSON.stringify({ emails: [{ key: "k3", size: 25 }] }),
);
const usage = await scanKvUsage(kv);
expect(usage.bytes).toBe(175);
});
it("setStorageSnapshot writes the snapshot fields", async () => {
const env = createMockEnv() as unknown as Env;
const kv = env.EMAIL_STORAGE;
await setStorageSnapshot(kv, {
attachments_bytes: 1234,
attachments_count: 5,
kv_bytes_estimated: 678,
});
const counters = await getCounters(kv);
expect(counters.attachments_bytes).toBe(1234);
expect(counters.attachments_count).toBe(5);
expect(counters.kv_bytes_estimated).toBe(678);
expect(counters.storage_scanned_at).toBeDefined();
});
});
+73
View File
@@ -2,6 +2,8 @@ import { Counters, Env, StatsResponse } from "../types";
import { STATS_KEY } from "../config/constants";
import { logger } from "../lib/logger";
import { listAllFeeds } from "../routes/admin/helpers";
import { getFeedMetadata } from "./storage";
import { getAttachmentBucket } from "./attachments";
const EMPTY_COUNTERS: Counters = {
feeds_created: 0,
@@ -82,5 +84,76 @@ export async function getStats(env: Env): Promise<StatsResponse> {
...counters,
active_feeds: feeds.length,
websub_subscriptions_active: websubCount,
attachments_enabled: !!getAttachmentBucket(env),
};
}
/** Sum the byte size and object count of every attachment stored in R2. */
export async function scanR2Usage(
bucket: R2Bucket,
): Promise<{ bytes: number; count: number }> {
let bytes = 0;
let count = 0;
let cursor: string | undefined;
try {
do {
const listed = await bucket.list({ cursor });
for (const obj of listed.objects) {
bytes += obj.size;
count += 1;
}
cursor = listed.truncated ? listed.cursor : undefined;
} while (cursor);
} catch (error) {
logger.error("Error scanning R2 usage", { error: String(error) });
}
return { bytes, count };
}
/**
* Estimate KV storage used. KV exposes no size API, so we sum the per-email
* sizes already recorded in each feed's metadata — email bodies dominate KV
* usage. Feed config/websub/stats keys are excluded, so this is a lower-bound
* estimate.
*/
export async function scanKvUsage(kv: KVNamespace): Promise<{ bytes: number }> {
let bytes = 0;
try {
const feeds = await listAllFeeds(kv);
for (const feed of feeds) {
const metadata = await getFeedMetadata(kv, feed.id);
if (!metadata) continue;
for (const email of metadata.emails) {
bytes += email.size ?? 0;
}
}
} catch (error) {
logger.error("Error estimating KV usage", { error: String(error) });
}
return { bytes };
}
/**
* Overwrite the storage-usage snapshot fields on the counters singleton.
* Unlike bumpCounters these are set (not incremented). Never throws.
*/
export async function setStorageSnapshot(
kv: KVNamespace,
snapshot: {
attachments_bytes: number;
attachments_count: number;
kv_bytes_estimated: number;
},
): Promise<void> {
try {
const current = await getCounters(kv);
current.attachments_bytes = snapshot.attachments_bytes;
current.attachments_count = snapshot.attachments_count;
current.kv_bytes_estimated = snapshot.kv_bytes_estimated;
current.storage_scanned_at = new Date().toISOString();
if (!current.first_seen) current.first_seen = new Date().toISOString();
await kv.put(STATS_KEY, JSON.stringify(current));
} catch (error) {
logger.error("Error writing storage snapshot", { error: String(error) });
}
}