mirror of
https://github.com/juherr/kill-the-news.git
synced 2026-06-20 22:03:48 +00:00
refactor: move KV repositories to infrastructure (Track P — points 2, 6c)
Make the domain stop depending on infrastructure ("imports point inward").
- Point 2: relocate the four KV adapters (FeedRepository, IconRepository,
WebSubSubscriptionRepository, CountersRepository) from domain/ to
infrastructure/, where the logger import is legitimate. The domain now keeps
only the pure key schema (feed-keys.ts), the Feed aggregate and value objects;
it imports nothing outward. Deliberately no hand-rolled 24-method port
interface (YAGNI without DI) — relocation alone fixes the direction.
- Point 6c: EmailParser.extractFeedId now returns a validated FeedId value
object instead of a raw string, so the most untrusted input (an inbound
recipient address) is guarded at the parse boundary and no longer round-trips
through FeedId.fromTrusted in the ingest path.
All import paths updated; CLAUDE.md source layout/KV-schema notes updated.
351 tests pass; tsc --noEmit clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { createMockEnv } from "../test/setup";
|
||||
import { CountersRepository } from "./counters-repository";
|
||||
import type { Env } from "../types";
|
||||
|
||||
const mockEnv = () => createMockEnv() as unknown as Env;
|
||||
|
||||
describe("CountersRepository", () => {
|
||||
it("round-trips the counters singleton", async () => {
|
||||
const repo = new CountersRepository(mockEnv().EMAIL_STORAGE);
|
||||
expect(await repo.getRaw()).toBeNull();
|
||||
await repo.put({
|
||||
feeds_created: 1,
|
||||
feeds_deleted: 0,
|
||||
emails_received: 2,
|
||||
emails_rejected: 0,
|
||||
unsubscribes_sent: 0,
|
||||
});
|
||||
expect(await repo.getRaw()).toMatchObject({ emails_received: 2 });
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,23 @@
|
||||
import { Counters, Env } from "../types";
|
||||
import { STATS_KEY } from "../domain/feed-keys";
|
||||
|
||||
/**
|
||||
* KV access for the monitoring counters singleton (`stats:counters`). The
|
||||
* increment policy lives in the application layer (utils/stats.ts); this
|
||||
* repository owns only the raw read/write of the blob.
|
||||
*/
|
||||
export class CountersRepository {
|
||||
constructor(private readonly kv: KVNamespace) {}
|
||||
|
||||
static from(env: Env): CountersRepository {
|
||||
return new CountersRepository(env.EMAIL_STORAGE);
|
||||
}
|
||||
|
||||
async getRaw(): Promise<Counters | null> {
|
||||
return (await this.kv.get(STATS_KEY, { type: "json" })) as Counters | null;
|
||||
}
|
||||
|
||||
async put(counters: Counters): Promise<void> {
|
||||
await this.kv.put(STATS_KEY, JSON.stringify(counters));
|
||||
}
|
||||
}
|
||||
@@ -4,9 +4,9 @@ import {
|
||||
ICON_TTL_SECONDS,
|
||||
MAX_ICON_BYTES,
|
||||
} from "../config/constants";
|
||||
import { IconRepository } from "../domain/icon-repository";
|
||||
import { IconRepository } from "./icon-repository";
|
||||
import { EmailAddress } from "../domain/value-objects/email-address";
|
||||
import { logger } from "../infrastructure/logger";
|
||||
import { logger } from "./logger";
|
||||
|
||||
interface IconRecord {
|
||||
data: string | null; // base64 icon bytes, or null for a negative cache entry
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { createMockEnv } from "../test/setup";
|
||||
import { FeedRepository } from "./feed-repository";
|
||||
import { FeedId } from "../domain/value-objects/feed-id";
|
||||
import type { Env, FeedConfig, FeedMetadata, EmailData } from "../types";
|
||||
|
||||
const mockEnv = () => createMockEnv() as unknown as Env;
|
||||
const fid = (value: string) => FeedId.fromTrusted(value);
|
||||
|
||||
const sampleConfig = (overrides: Partial<FeedConfig> = {}): FeedConfig => ({
|
||||
title: "Test Feed",
|
||||
language: "en",
|
||||
created_at: 1000,
|
||||
...overrides,
|
||||
});
|
||||
|
||||
const sampleEmail = (overrides: Partial<EmailData> = {}): EmailData => ({
|
||||
subject: "Hello",
|
||||
from: "news@example.com",
|
||||
content: "<p>hi</p>",
|
||||
receivedAt: 1234,
|
||||
headers: {},
|
||||
...overrides,
|
||||
});
|
||||
|
||||
describe("FeedRepository key schema", () => {
|
||||
it("builds the canonical KV keys via the public API", () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
expect(repo.feedKeyPrefix(fid("a.b.42"))).toBe("feed:a.b.42:");
|
||||
expect(repo.newEmailKey(fid("a.b.42"))).toMatch(/^feed:a\.b\.42:\d+$/);
|
||||
});
|
||||
|
||||
it("recognises email keys vs config/metadata keys", () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
expect(repo.isEmailKey(fid("a.b.42"), "feed:a.b.42:config")).toBe(false);
|
||||
expect(repo.isEmailKey(fid("a.b.42"), "feed:a.b.42:metadata")).toBe(false);
|
||||
expect(repo.isEmailKey(fid("a.b.42"), "feed:a.b.42:1700000000000")).toBe(
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
it("recovers the feed id from an email key", () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
expect(repo.feedIdFromEmailKey("feed:a.b.42:1700000000000")).toBe("a.b.42");
|
||||
});
|
||||
});
|
||||
|
||||
describe("FeedRepository config & metadata", () => {
|
||||
it("round-trips and deletes a feed config", async () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
expect(await repo.getConfig(fid("a.b.42"))).toBeNull();
|
||||
await repo.putConfig(fid("a.b.42"), sampleConfig());
|
||||
expect(await repo.getConfig(fid("a.b.42"))).toMatchObject({
|
||||
title: "Test Feed",
|
||||
});
|
||||
await repo.deleteConfig(fid("a.b.42"));
|
||||
expect(await repo.getConfig(fid("a.b.42"))).toBeNull();
|
||||
});
|
||||
|
||||
it("round-trips and deletes feed metadata", async () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
const meta: FeedMetadata = { emails: [] };
|
||||
await repo.putMetadata(fid("a.b.42"), meta);
|
||||
expect(await repo.getMetadata(fid("a.b.42"))).toEqual(meta);
|
||||
await repo.deleteMetadata(fid("a.b.42"));
|
||||
expect(await repo.getMetadata(fid("a.b.42"))).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("FeedRepository emails", () => {
|
||||
it("stores and reads an email under a minted key", async () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
const key = repo.newEmailKey(fid("a.b.42"));
|
||||
await repo.putEmail(key, sampleEmail());
|
||||
expect(await repo.getEmail(key)).toMatchObject({ subject: "Hello" });
|
||||
await repo.deleteEmail(key);
|
||||
expect(await repo.getEmail(key)).toBeNull();
|
||||
});
|
||||
|
||||
it("lists every key under a feed prefix", async () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
await repo.putConfig(fid("a.b.42"), sampleConfig());
|
||||
await repo.putMetadata(fid("a.b.42"), { emails: [] });
|
||||
const emailKey = repo.newEmailKey(fid("a.b.42"));
|
||||
await repo.putEmail(emailKey, sampleEmail());
|
||||
|
||||
const listed = await repo.listFeedKeys(fid("a.b.42"));
|
||||
expect(listed.names).toContain("feed:a.b.42:config");
|
||||
expect(listed.names).toContain("feed:a.b.42:metadata");
|
||||
expect(listed.names).toContain(emailKey);
|
||||
expect(
|
||||
listed.names.filter((k) => repo.isEmailKey(fid("a.b.42"), k)),
|
||||
).toEqual([emailKey]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("FeedRepository feed list", () => {
|
||||
it("adds, updates, lists and removes feeds with expiry", async () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
await repo.addToList(fid("a.b.42"), "One", "desc", 5000);
|
||||
await repo.addToList(fid("c.d.99"), "Two");
|
||||
|
||||
let feeds = await repo.listFeeds();
|
||||
expect(feeds).toHaveLength(2);
|
||||
expect(feeds.find((f) => f.id === "a.b.42")).toMatchObject({
|
||||
title: "One",
|
||||
expires_at: 5000,
|
||||
});
|
||||
|
||||
await repo.updateInList(fid("a.b.42"), "One-updated", undefined, undefined);
|
||||
feeds = await repo.listFeeds();
|
||||
const updated = feeds.find((f) => f.id === "a.b.42");
|
||||
expect(updated).toMatchObject({ title: "One-updated" });
|
||||
expect(updated?.expires_at).toBeUndefined();
|
||||
|
||||
expect(await repo.removeFromList(fid("a.b.42"))).toBe(true);
|
||||
expect(await repo.removeFromList(fid("missing"))).toBe(false);
|
||||
feeds = await repo.listFeeds();
|
||||
expect(feeds.map((f) => f.id)).toEqual(["c.d.99"]);
|
||||
});
|
||||
|
||||
it("bulk-removes only the matching ids", async () => {
|
||||
const repo = new FeedRepository(mockEnv().EMAIL_STORAGE);
|
||||
await repo.addToList(fid("a.b.42"), "One");
|
||||
await repo.addToList(fid("c.d.99"), "Two");
|
||||
await repo.addToList(fid("e.f.10"), "Three");
|
||||
|
||||
const removed = await repo.removeFromListBulk(["a.b.42", "e.f.10", "nope"]);
|
||||
expect(removed.sort()).toEqual(["a.b.42", "e.f.10"]);
|
||||
expect((await repo.listFeeds()).map((f) => f.id)).toEqual(["c.d.99"]);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,276 @@
|
||||
import {
|
||||
EmailData,
|
||||
Env,
|
||||
FeedConfig,
|
||||
FeedList,
|
||||
FeedListItem,
|
||||
FeedMetadata,
|
||||
} from "../types";
|
||||
import { FEEDS_LIST_KEY } from "../config/constants";
|
||||
import { feedKeys } from "../domain/feed-keys";
|
||||
import { Feed } from "../domain/feed.aggregate";
|
||||
import { FeedId } from "../domain/value-objects/feed-id";
|
||||
import { logger } from "./logger";
|
||||
|
||||
/**
|
||||
* Single source of truth for KV access to the Feed aggregate. The key schema
|
||||
* itself lives in `feed-keys.ts`; this repository owns the get/put operations.
|
||||
* No other module should build a `feed:`/`feeds:list`/`websub:`/`icon:`/
|
||||
* `stats:counters` key string — go through `feed-keys` or a repository method.
|
||||
*
|
||||
* Wraps one `KVNamespace`; construct per request via `FeedRepository.from(env)`.
|
||||
*/
|
||||
export class FeedRepository {
|
||||
constructor(private readonly kv: KVNamespace) {}
|
||||
|
||||
static from(env: Env): FeedRepository {
|
||||
return new FeedRepository(env.EMAIL_STORAGE);
|
||||
}
|
||||
|
||||
// ── Key schema (delegates to feed-keys) ───────────────────────────────────
|
||||
|
||||
private configKey(feedId: FeedId): string {
|
||||
return feedKeys.config(feedId.value);
|
||||
}
|
||||
|
||||
private metadataKey(feedId: FeedId): string {
|
||||
return feedKeys.metadata(feedId.value);
|
||||
}
|
||||
|
||||
/** Prefix covering every key owned by a feed (config, metadata, emails). */
|
||||
feedKeyPrefix(feedId: FeedId): string {
|
||||
return feedKeys.feedPrefix(feedId.value);
|
||||
}
|
||||
|
||||
/** Mint a fresh, time-ordered email key. Call once and reuse the result. */
|
||||
newEmailKey(feedId: FeedId): string {
|
||||
return feedKeys.newEmail(feedId.value);
|
||||
}
|
||||
|
||||
/** True when `key` is an email entry (not the feed's config/metadata key). */
|
||||
isEmailKey(feedId: FeedId, key: string): boolean {
|
||||
return feedKeys.isEmail(feedId.value, key);
|
||||
}
|
||||
|
||||
/** Recover the feed id embedded in an email key (`feed:<id>:<ts>`). */
|
||||
feedIdFromEmailKey(key: string): string {
|
||||
return feedKeys.feedIdFromEmail(key);
|
||||
}
|
||||
|
||||
// ── Feed aggregate ────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Load the aggregate (config + email index). A feed exists iff it has a
|
||||
* config; metadata defaults to empty so a freshly-created feed still loads.
|
||||
*/
|
||||
async load(feedId: FeedId): Promise<Feed | null> {
|
||||
const [config, metadata] = await Promise.all([
|
||||
this.getConfig(feedId),
|
||||
this.getMetadata(feedId),
|
||||
]);
|
||||
if (!config) return null;
|
||||
return Feed.reconstitute(feedId, config, metadata ?? { emails: [] });
|
||||
}
|
||||
|
||||
/** Persist both keys the aggregate owns (config + metadata). */
|
||||
async save(feed: Feed): Promise<void> {
|
||||
await Promise.all([
|
||||
this.putConfig(feed.id, feed.config),
|
||||
this.putMetadata(feed.id, feed.metadata),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist only the email index. Used by the ingest/delete paths where config
|
||||
* is unchanged — avoids a redundant config write on the hot path.
|
||||
*/
|
||||
async saveMetadata(feed: Feed): Promise<void> {
|
||||
await this.putMetadata(feed.id, feed.metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist only the config. Used by the rename/edit paths where metadata is
|
||||
* unchanged — avoids re-writing (and risking clobbering) the email index.
|
||||
*/
|
||||
async saveConfig(feed: Feed): Promise<void> {
|
||||
await this.putConfig(feed.id, feed.config);
|
||||
}
|
||||
|
||||
// ── Feed config ───────────────────────────────────────────────────────────
|
||||
|
||||
async getConfig(feedId: FeedId): Promise<FeedConfig | null> {
|
||||
return (await this.kv.get(this.configKey(feedId), {
|
||||
type: "json",
|
||||
})) as FeedConfig | null;
|
||||
}
|
||||
|
||||
async putConfig(feedId: FeedId, config: FeedConfig): Promise<void> {
|
||||
await this.kv.put(this.configKey(feedId), JSON.stringify(config));
|
||||
}
|
||||
|
||||
async deleteConfig(feedId: FeedId): Promise<void> {
|
||||
await this.kv.delete(this.configKey(feedId));
|
||||
}
|
||||
|
||||
// ── Feed metadata ─────────────────────────────────────────────────────────
|
||||
|
||||
async getMetadata(feedId: FeedId): Promise<FeedMetadata | null> {
|
||||
return (await this.kv.get(this.metadataKey(feedId), {
|
||||
type: "json",
|
||||
})) as FeedMetadata | null;
|
||||
}
|
||||
|
||||
async putMetadata(feedId: FeedId, metadata: FeedMetadata): Promise<void> {
|
||||
await this.kv.put(this.metadataKey(feedId), JSON.stringify(metadata));
|
||||
}
|
||||
|
||||
async deleteMetadata(feedId: FeedId): Promise<void> {
|
||||
await this.kv.delete(this.metadataKey(feedId));
|
||||
}
|
||||
|
||||
// ── Emails ────────────────────────────────────────────────────────────────
|
||||
|
||||
async putEmail(key: string, data: EmailData): Promise<void> {
|
||||
await this.kv.put(key, JSON.stringify(data));
|
||||
}
|
||||
|
||||
async getEmail(key: string): Promise<EmailData | null> {
|
||||
return (await this.kv.get(key, { type: "json" })) as EmailData | null;
|
||||
}
|
||||
|
||||
async deleteEmail(key: string): Promise<void> {
|
||||
await this.kv.delete(key);
|
||||
}
|
||||
|
||||
// ── Global feed list ──────────────────────────────────────────────────────
|
||||
|
||||
async listFeeds(): Promise<FeedListItem[]> {
|
||||
try {
|
||||
const feedList = (await this.kv.get(FEEDS_LIST_KEY, {
|
||||
type: "json",
|
||||
})) as FeedList | null;
|
||||
return feedList?.feeds || [];
|
||||
} catch (error) {
|
||||
logger.error("Error listing feeds", { error: String(error) });
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async addToList(
|
||||
feedId: FeedId,
|
||||
title: string,
|
||||
description?: string,
|
||||
expires_at?: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const feedList = ((await this.kv.get(FEEDS_LIST_KEY, {
|
||||
type: "json",
|
||||
})) as FeedList | null) || { feeds: [] };
|
||||
|
||||
feedList.feeds.push({ id: feedId.value, title, description, expires_at });
|
||||
await this.kv.put(FEEDS_LIST_KEY, JSON.stringify(feedList));
|
||||
} catch (error) {
|
||||
logger.error("Error adding feed to list", {
|
||||
feedId: feedId.value,
|
||||
error: String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async updateInList(
|
||||
feedId: FeedId,
|
||||
title: string,
|
||||
description?: string,
|
||||
expires_at?: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const feedList = ((await this.kv.get(FEEDS_LIST_KEY, {
|
||||
type: "json",
|
||||
})) as FeedList | null) || { feeds: [] };
|
||||
|
||||
const feedIndex = feedList.feeds.findIndex(
|
||||
(feed) => feed.id === feedId.value,
|
||||
);
|
||||
if (feedIndex !== -1) {
|
||||
feedList.feeds[feedIndex].title = title;
|
||||
feedList.feeds[feedIndex].description = description;
|
||||
feedList.feeds[feedIndex].expires_at = expires_at;
|
||||
await this.kv.put(FEEDS_LIST_KEY, JSON.stringify(feedList));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error("Error updating feed in list", {
|
||||
feedId: feedId.value,
|
||||
error: String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async removeFromListBulk(feedIds: string[]): Promise<string[]> {
|
||||
try {
|
||||
const feedList = ((await this.kv.get(FEEDS_LIST_KEY, {
|
||||
type: "json",
|
||||
})) as FeedList | null) || { feeds: [] };
|
||||
|
||||
const toRemove = new Set(feedIds.filter(Boolean));
|
||||
if (toRemove.size === 0) return [];
|
||||
|
||||
const removed: string[] = [];
|
||||
const nextFeeds: FeedListItem[] = [];
|
||||
|
||||
for (const feed of feedList.feeds) {
|
||||
if (toRemove.has(feed.id)) {
|
||||
removed.push(feed.id);
|
||||
continue;
|
||||
}
|
||||
nextFeeds.push(feed);
|
||||
}
|
||||
|
||||
if (removed.length === 0) return [];
|
||||
|
||||
feedList.feeds = nextFeeds;
|
||||
await this.kv.put(FEEDS_LIST_KEY, JSON.stringify(feedList));
|
||||
return removed;
|
||||
} catch (error) {
|
||||
logger.error("Error removing feeds from list", { error: String(error) });
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async removeFromList(feedId: FeedId): Promise<boolean> {
|
||||
const removed = await this.removeFromListBulk([feedId.value]);
|
||||
return removed.includes(feedId.value);
|
||||
}
|
||||
|
||||
// ── Key listing / counting ────────────────────────────────────────────────
|
||||
|
||||
async listFeedKeys(
|
||||
feedId: FeedId,
|
||||
options: { cursor?: string; limit?: number } = {},
|
||||
): Promise<{ names: string[]; cursor: string; listComplete: boolean }> {
|
||||
const prefix = this.feedKeyPrefix(feedId);
|
||||
const limit = Math.min(1000, Math.max(1, Math.floor(options.limit || 100)));
|
||||
const cursor = options.cursor || undefined;
|
||||
|
||||
const listed = await this.kv.list({ prefix, cursor, limit });
|
||||
return {
|
||||
names: (listed.keys || []).map((k) => k.name),
|
||||
cursor: listed.cursor || "",
|
||||
listComplete: !!listed.list_complete,
|
||||
};
|
||||
}
|
||||
|
||||
async countKeysByPrefix(prefix: string): Promise<number> {
|
||||
let total = 0;
|
||||
let cursor: string | undefined;
|
||||
try {
|
||||
do {
|
||||
const listed = await this.kv.list({ prefix, cursor, limit: 1000 });
|
||||
total += listed.keys.length;
|
||||
cursor = listed.list_complete ? undefined : listed.cursor;
|
||||
} while (cursor);
|
||||
} catch (error) {
|
||||
logger.error("Error counting keys", { prefix, error: String(error) });
|
||||
}
|
||||
return total;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { createMockEnv } from "../test/setup";
|
||||
import { IconRepository } from "./icon-repository";
|
||||
import type { Env } from "../types";
|
||||
|
||||
const mockEnv = () => createMockEnv() as unknown as Env;
|
||||
|
||||
describe("IconRepository", () => {
|
||||
it("stores and reads favicons as text or json under the icon: key", async () => {
|
||||
const repo = new IconRepository(mockEnv().EMAIL_STORAGE);
|
||||
expect(await repo.getText("example.com")).toBeNull();
|
||||
await repo.put("example.com", JSON.stringify({ data: null }), 60);
|
||||
expect(await repo.getText("example.com")).toBe('{"data":null}');
|
||||
expect(await repo.getJson<{ data: null }>("example.com")).toEqual({
|
||||
data: null,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,31 @@
|
||||
import { Env } from "../types";
|
||||
import { feedKeys } from "../domain/feed-keys";
|
||||
|
||||
/**
|
||||
* KV access for cached per-domain favicons (`icon:<domain>`). Entries may be
|
||||
* positive (base64 bytes) or negative (a sentinel marking a failed fetch), and
|
||||
* always carry a TTL — the cache's sole expiry mechanism.
|
||||
*/
|
||||
export class IconRepository {
|
||||
constructor(private readonly kv: KVNamespace) {}
|
||||
|
||||
static from(env: Env): IconRepository {
|
||||
return new IconRepository(env.EMAIL_STORAGE);
|
||||
}
|
||||
|
||||
getText(domain: string): Promise<string | null> {
|
||||
return this.kv.get(feedKeys.icon(domain), "text");
|
||||
}
|
||||
|
||||
async getJson<T>(domain: string): Promise<T | null> {
|
||||
return (await this.kv.get(feedKeys.icon(domain), {
|
||||
type: "json",
|
||||
})) as T | null;
|
||||
}
|
||||
|
||||
async put(domain: string, value: string, ttlSeconds: number): Promise<void> {
|
||||
await this.kv.put(feedKeys.icon(domain), value, {
|
||||
expirationTtl: ttlSeconds,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { createMockEnv } from "../test/setup";
|
||||
import { WebSubSubscriptionRepository } from "./websub-subscription-repository";
|
||||
import type { Env, WebSubSubscription } from "../types";
|
||||
|
||||
const mockEnv = () => createMockEnv() as unknown as Env;
|
||||
|
||||
describe("WebSubSubscriptionRepository", () => {
|
||||
it("round-trips subscriptions and counts feeds with subscribers", async () => {
|
||||
const repo = new WebSubSubscriptionRepository(mockEnv().EMAIL_STORAGE);
|
||||
expect(await repo.get("a.b.42")).toEqual([]);
|
||||
const subs: WebSubSubscription[] = [
|
||||
{ callbackUrl: "https://r.example/cb", expiresAt: 9999 },
|
||||
];
|
||||
await repo.save("a.b.42", subs);
|
||||
expect(await repo.get("a.b.42")).toEqual(subs);
|
||||
expect(await repo.countKeys()).toBe(1);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,45 @@
|
||||
import { Env, WebSubSubscription } from "../types";
|
||||
import { feedKeys } from "../domain/feed-keys";
|
||||
import { logger } from "./logger";
|
||||
|
||||
/**
|
||||
* KV access for per-feed WebSub subscriber lists (`websub:subs:<feedId>`).
|
||||
*/
|
||||
export class WebSubSubscriptionRepository {
|
||||
constructor(private readonly kv: KVNamespace) {}
|
||||
|
||||
static from(env: Env): WebSubSubscriptionRepository {
|
||||
return new WebSubSubscriptionRepository(env.EMAIL_STORAGE);
|
||||
}
|
||||
|
||||
async get(feedId: string): Promise<WebSubSubscription[]> {
|
||||
const raw = await this.kv.get(feedKeys.websub(feedId), "json");
|
||||
return (raw as WebSubSubscription[] | null) ?? [];
|
||||
}
|
||||
|
||||
async save(
|
||||
feedId: string,
|
||||
subscriptions: WebSubSubscription[],
|
||||
): Promise<void> {
|
||||
await this.kv.put(feedKeys.websub(feedId), JSON.stringify(subscriptions));
|
||||
}
|
||||
|
||||
/** Number of feeds that currently hold at least one WebSub subscription. */
|
||||
async countKeys(): Promise<number> {
|
||||
const prefix = feedKeys.websubPrefix();
|
||||
let total = 0;
|
||||
let cursor: string | undefined;
|
||||
try {
|
||||
do {
|
||||
const listed = await this.kv.list({ prefix, cursor, limit: 1000 });
|
||||
total += listed.keys.length;
|
||||
cursor = listed.list_complete ? undefined : listed.cursor;
|
||||
} while (cursor);
|
||||
} catch (error) {
|
||||
logger.error("Error counting subscription keys", {
|
||||
error: String(error),
|
||||
});
|
||||
}
|
||||
return total;
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,8 @@
|
||||
import { Env, FeedConfig, EmailData, WebSubSubscription } from "../types";
|
||||
import { generateRssFeed, generateAtomFeed } from "./feed-generator";
|
||||
import { baseUrl, feedRssUrl, feedAtomUrl, feedUrl } from "./urls";
|
||||
import { FeedRepository } from "../domain/feed-repository";
|
||||
import { WebSubSubscriptionRepository } from "../domain/websub-subscription-repository";
|
||||
import { FeedRepository } from "./feed-repository";
|
||||
import { WebSubSubscriptionRepository } from "./websub-subscription-repository";
|
||||
import { FeedId } from "../domain/value-objects/feed-id";
|
||||
|
||||
export async function getSubscriptions(
|
||||
|
||||
Reference in New Issue
Block a user