From e7d28bc62be7c4a89bf0cfd258b8d61ebbcd08c2 Mon Sep 17 00:00:00 2001 From: OpenClaw Date: Mon, 16 Feb 2026 20:07:11 +0000 Subject: [PATCH] fix: batch usage writes (#10), retry divergence (#12), per-key queue fairness (#15) --- src/middleware/pdfRateLimit.ts | 27 +++++++++---- src/middleware/usage.ts | 70 ++++++++++++++++++++++++++++------ 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/src/middleware/pdfRateLimit.ts b/src/middleware/pdfRateLimit.ts index b17dec9..24fd6bb 100644 --- a/src/middleware/pdfRateLimit.ts +++ b/src/middleware/pdfRateLimit.ts @@ -1,5 +1,6 @@ import { Request, Response, NextFunction } from "express"; import { isProKey } from "../services/keys.js"; +import logger from "../services/logger.js"; interface RateLimitEntry { count: number; @@ -15,9 +16,12 @@ const RATE_WINDOW_MS = 60_000; // 1 minute const MAX_CONCURRENT_PDFS = 3; const MAX_QUEUE_SIZE = 10; +// Per-key queue fairness (Audit #15) +const MAX_QUEUED_PER_KEY = 3; + const rateLimitStore = new Map(); let activePdfCount = 0; -const pdfQueue: Array<{ resolve: () => void; reject: (error: Error) => void }> = []; +const pdfQueue: Array<{ resolve: () => void; reject: (error: Error) => void; apiKey: string }> = []; function cleanupExpiredEntries(): void { const now = Date.now(); @@ -40,7 +44,6 @@ function checkRateLimit(apiKey: string): boolean { const entry = rateLimitStore.get(apiKey); if (!entry || now >= entry.resetTime) { - // Create new window rateLimitStore.set(apiKey, { count: 1, resetTime: now + RATE_WINDOW_MS @@ -56,7 +59,11 @@ function checkRateLimit(apiKey: string): boolean { return true; } -async function acquireConcurrencySlot(): Promise { +function getQueuedCountForKey(apiKey: string): number { + return pdfQueue.filter(w => w.apiKey === apiKey).length; +} + +async function acquireConcurrencySlot(apiKey: string): Promise { if (activePdfCount < MAX_CONCURRENT_PDFS) { activePdfCount++; return; @@ -66,8 +73,14 @@ async function acquireConcurrencySlot(): Promise { throw new Error("QUEUE_FULL"); } + // Audit #15: Per-key fairness — reject if this key already has too many queued + if (getQueuedCountForKey(apiKey) >= MAX_QUEUED_PER_KEY) { + logger.warn({ apiKey: apiKey.slice(0, 8) + "..." }, "Per-key queue limit reached"); + throw new Error("QUEUE_FULL"); + } + return new Promise((resolve, reject) => { - pdfQueue.push({ resolve, reject }); + pdfQueue.push({ resolve, reject, apiKey }); }); } @@ -98,8 +111,8 @@ export function pdfRateLimitMiddleware(req: Request & { apiKeyInfo?: any }, res: return; } - // Add concurrency control to the request - (req as any).acquirePdfSlot = acquireConcurrencySlot; + // Add concurrency control to the request (pass apiKey for fairness) + (req as any).acquirePdfSlot = () => acquireConcurrencySlot(apiKey); (req as any).releasePdfSlot = releaseConcurrencySlot; next(); @@ -115,4 +128,4 @@ export function getConcurrencyStats() { } // Proactive cleanup every 60s -setInterval(cleanupExpiredEntries, 60_000); \ No newline at end of file +setInterval(cleanupExpiredEntries, 60_000); diff --git a/src/middleware/usage.ts b/src/middleware/usage.ts index 94d48a2..0644714 100644 --- a/src/middleware/usage.ts +++ b/src/middleware/usage.ts @@ -8,6 +8,13 @@ const PRO_TIER_LIMIT = 5000; // In-memory cache, periodically synced to PostgreSQL let usage = new Map(); +// Write-behind buffer for batching DB writes (Audit #10) +const dirtyKeys = new Set(); +const retryCount = new Map(); +const MAX_RETRIES = 3; +const FLUSH_INTERVAL_MS = 5000; +const FLUSH_THRESHOLD = 50; + function getMonthKey(): string { const d = new Date(); return `${d.getFullYear()}-${String(d.getMonth() + 1).padStart(2, "0")}`; @@ -27,18 +34,56 @@ export async function loadUsageData(): Promise { } } -async function saveUsageEntry(key: string, record: { count: number; monthKey: string }): Promise { +// Batch flush dirty entries to DB (Audit #10 + #12) +async function flushDirtyEntries(): Promise { + if (dirtyKeys.size === 0) return; + + const keysToFlush = [...dirtyKeys]; + + const client = await pool.connect(); try { - await pool.query( - `INSERT INTO usage (key, count, month_key) VALUES ($1, $2, $3) - ON CONFLICT (key) DO UPDATE SET count = $2, month_key = $3`, - [key, record.count, record.monthKey] - ); + await client.query("BEGIN"); + for (const key of keysToFlush) { + const record = usage.get(key); + if (!record) continue; + try { + await client.query( + `INSERT INTO usage (key, count, month_key) VALUES ($1, $2, $3) + ON CONFLICT (key) DO UPDATE SET count = $2, month_key = $3`, + [key, record.count, record.monthKey] + ); + dirtyKeys.delete(key); + retryCount.delete(key); + } catch (error) { + // Audit #12: retry logic for failed writes + const retries = (retryCount.get(key) || 0) + 1; + if (retries >= MAX_RETRIES) { + logger.error({ key: key.slice(0, 8) + "...", retries }, "CRITICAL: Usage write failed after max retries, data may diverge"); + dirtyKeys.delete(key); + retryCount.delete(key); + } else { + retryCount.set(key, retries); + logger.warn({ key: key.slice(0, 8) + "...", retries }, "Usage write failed, will retry"); + } + } + } + await client.query("COMMIT"); } catch (error) { - logger.error({ err: error }, "Failed to save usage data"); + await client.query("ROLLBACK").catch(() => {}); + logger.error({ err: error }, "Failed to flush usage batch"); + // Keep all keys dirty for retry + } finally { + client.release(); } } +// Periodic flush +setInterval(flushDirtyEntries, FLUSH_INTERVAL_MS); + +// Flush on process exit +process.on("SIGTERM", () => { flushDirtyEntries().catch(() => {}); }); +process.on("SIGINT", () => { flushDirtyEntries().catch(() => {}); }); + export function usageMiddleware(req: any, res: any, next: any): void { const keyInfo = req.apiKeyInfo; const key = keyInfo?.key || "unknown"; @@ -77,12 +122,15 @@ export function usageMiddleware(req: any, res: any, next: any): void { function trackUsage(key: string, monthKey: string): void { const record = usage.get(key); if (!record || record.monthKey !== monthKey) { - const newRecord = { count: 1, monthKey }; - usage.set(key, newRecord); - saveUsageEntry(key, newRecord).catch((err) => logger.error({ err }, "Failed to save usage entry")); + usage.set(key, { count: 1, monthKey }); } else { record.count++; - saveUsageEntry(key, record).catch((err) => logger.error({ err }, "Failed to save usage entry")); + } + dirtyKeys.add(key); + + // Flush immediately if threshold reached + if (dirtyKeys.size >= FLUSH_THRESHOLD) { + flushDirtyEntries().catch((err) => logger.error({ err }, "Threshold flush failed")); } }