import { EmailData, EmailMetadata, Env } from "../types"; import { logger } from "../infrastructure/logger"; import { getAttachmentBucket } from "../infrastructure/attachments"; import { FeedRepository } from "../infrastructure/feed-repository"; import { FeedId } from "../domain/value-objects/feed-id"; // All R2 object ids an email owns — both downloadable attachments and inline // images. Inline images are hidden from the user-facing lists but must still be // purged from the bucket when the email is deleted. export function attachmentIdsForCleanup(e: EmailMetadata): string[] { return [...(e.attachmentIds ?? []), ...(e.inlineAttachmentIds ?? [])]; } // Delete the R2 attachments belonging to the given email keys. Call before the // emails are removed from feed metadata, while `emails` still carries their // attachment ids. export async function deleteAttachmentsForEmails( env: Env, emails: readonly EmailMetadata[], keys: Iterable, ): Promise { const keySet = new Set(keys); const attachmentIds = emails .filter((e) => keySet.has(e.key)) .flatMap((e) => attachmentIdsForCleanup(e)); if (attachmentIds.length === 0) return; const bucket = getAttachmentBucket(env); if (!bucket) return; await Promise.allSettled(attachmentIds.map((id) => bucket.delete(id))); } export async function deleteKeysWithConcurrency( emailStorage: KVNamespace, keys: string[], concurrency: number, ): Promise<{ ok: string[]; failed: string[] }> { const uniqueKeys = Array.from(new Set(keys.filter(Boolean))); const ok: string[] = []; const failed: string[] = []; const limit = Math.max(1, Math.floor(concurrency) || 1); for (let i = 0; i < uniqueKeys.length; i += limit) { const batch = uniqueKeys.slice(i, i + limit); const results = await Promise.allSettled( batch.map((key) => emailStorage.delete(key)), ); results.forEach((result, idx) => { const key = batch[idx]; if (result.status === "fulfilled") { ok.push(key); } else { failed.push(key); } }); } return { ok, failed }; } /** * Read a feed's stored RFC 8058 one-click unsubscribe URLs (one per sender). * Must be called before the feed metadata is deleted. Never throws. */ export async function collectUnsubscribeUrls( emailStorage: KVNamespace, feedId: FeedId, ): Promise { try { const metadata = await new FeedRepository(emailStorage).getMetadata(feedId); return Object.values(metadata?.unsubscribe ?? {}); } catch (error) { logger.error("Error reading unsubscribe URLs", { feedId: feedId.value, error: String(error), }); return []; } } export async function purgeFeedKeysStep( emailStorage: KVNamespace, feedId: FeedId, options: { cursor?: string; limit?: number; bucket?: R2Bucket } = {}, ): Promise<{ deletedKeys: string[]; failedKeys: string[]; cursor: string; listComplete: boolean; }> { const repo = new FeedRepository(emailStorage); const listed = await repo.listFeedKeys(feedId, { cursor: options.cursor, limit: options.limit, }); const keys = listed.names; if (options.bucket && keys.length > 0) { const emailKeys = keys.filter((k) => repo.isEmailKey(feedId, k)); if (emailKeys.length > 0) { const emailDataResults = await Promise.allSettled( emailKeys.map((k) => repo.getEmail(k)), ); const attachmentIds = emailDataResults .filter( (r): r is PromiseFulfilledResult => r.status === "fulfilled", ) .flatMap((r) => r.value?.attachments?.map((a) => a.id) ?? []); if (attachmentIds.length > 0) { await Promise.allSettled( attachmentIds.map((id) => options.bucket!.delete(id)), ); } } } const { ok, failed } = await deleteKeysWithConcurrency( emailStorage, keys, 35, ); return { deletedKeys: ok, failedKeys: failed, cursor: listed.cursor, listComplete: listed.listComplete, }; } export async function purgeExpiredFeeds( emailStorage: KVNamespace, feedId: FeedId, bucket?: R2Bucket, ): Promise { let cursor: string | undefined; do { const step = await purgeFeedKeysStep(emailStorage, feedId, { bucket, limit: 100, cursor, }); cursor = step.listComplete ? undefined : step.cursor; } while (cursor); }