fix: batch usage writes (#10), retry divergence (#12), per-key queue fairness (#15)
All checks were successful
Deploy to Production / Deploy to Server (push) Successful in 3m9s
All checks were successful
Deploy to Production / Deploy to Server (push) Successful in 3m9s
This commit is contained in:
parent
09c6feb06e
commit
e7d28bc62b
2 changed files with 79 additions and 18 deletions
|
|
@ -1,5 +1,6 @@
|
||||||
import { Request, Response, NextFunction } from "express";
|
import { Request, Response, NextFunction } from "express";
|
||||||
import { isProKey } from "../services/keys.js";
|
import { isProKey } from "../services/keys.js";
|
||||||
|
import logger from "../services/logger.js";
|
||||||
|
|
||||||
interface RateLimitEntry {
|
interface RateLimitEntry {
|
||||||
count: number;
|
count: number;
|
||||||
|
|
@ -15,9 +16,12 @@ const RATE_WINDOW_MS = 60_000; // 1 minute
|
||||||
const MAX_CONCURRENT_PDFS = 3;
|
const MAX_CONCURRENT_PDFS = 3;
|
||||||
const MAX_QUEUE_SIZE = 10;
|
const MAX_QUEUE_SIZE = 10;
|
||||||
|
|
||||||
|
// Per-key queue fairness (Audit #15)
|
||||||
|
const MAX_QUEUED_PER_KEY = 3;
|
||||||
|
|
||||||
const rateLimitStore = new Map<string, RateLimitEntry>();
|
const rateLimitStore = new Map<string, RateLimitEntry>();
|
||||||
let activePdfCount = 0;
|
let activePdfCount = 0;
|
||||||
const pdfQueue: Array<{ resolve: () => void; reject: (error: Error) => void }> = [];
|
const pdfQueue: Array<{ resolve: () => void; reject: (error: Error) => void; apiKey: string }> = [];
|
||||||
|
|
||||||
function cleanupExpiredEntries(): void {
|
function cleanupExpiredEntries(): void {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
|
|
@ -40,7 +44,6 @@ function checkRateLimit(apiKey: string): boolean {
|
||||||
const entry = rateLimitStore.get(apiKey);
|
const entry = rateLimitStore.get(apiKey);
|
||||||
|
|
||||||
if (!entry || now >= entry.resetTime) {
|
if (!entry || now >= entry.resetTime) {
|
||||||
// Create new window
|
|
||||||
rateLimitStore.set(apiKey, {
|
rateLimitStore.set(apiKey, {
|
||||||
count: 1,
|
count: 1,
|
||||||
resetTime: now + RATE_WINDOW_MS
|
resetTime: now + RATE_WINDOW_MS
|
||||||
|
|
@ -56,7 +59,11 @@ function checkRateLimit(apiKey: string): boolean {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function acquireConcurrencySlot(): Promise<void> {
|
function getQueuedCountForKey(apiKey: string): number {
|
||||||
|
return pdfQueue.filter(w => w.apiKey === apiKey).length;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function acquireConcurrencySlot(apiKey: string): Promise<void> {
|
||||||
if (activePdfCount < MAX_CONCURRENT_PDFS) {
|
if (activePdfCount < MAX_CONCURRENT_PDFS) {
|
||||||
activePdfCount++;
|
activePdfCount++;
|
||||||
return;
|
return;
|
||||||
|
|
@ -66,8 +73,14 @@ async function acquireConcurrencySlot(): Promise<void> {
|
||||||
throw new Error("QUEUE_FULL");
|
throw new Error("QUEUE_FULL");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Audit #15: Per-key fairness — reject if this key already has too many queued
|
||||||
|
if (getQueuedCountForKey(apiKey) >= MAX_QUEUED_PER_KEY) {
|
||||||
|
logger.warn({ apiKey: apiKey.slice(0, 8) + "..." }, "Per-key queue limit reached");
|
||||||
|
throw new Error("QUEUE_FULL");
|
||||||
|
}
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
pdfQueue.push({ resolve, reject });
|
pdfQueue.push({ resolve, reject, apiKey });
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -98,8 +111,8 @@ export function pdfRateLimitMiddleware(req: Request & { apiKeyInfo?: any }, res:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add concurrency control to the request
|
// Add concurrency control to the request (pass apiKey for fairness)
|
||||||
(req as any).acquirePdfSlot = acquireConcurrencySlot;
|
(req as any).acquirePdfSlot = () => acquireConcurrencySlot(apiKey);
|
||||||
(req as any).releasePdfSlot = releaseConcurrencySlot;
|
(req as any).releasePdfSlot = releaseConcurrencySlot;
|
||||||
|
|
||||||
next();
|
next();
|
||||||
|
|
@ -115,4 +128,4 @@ export function getConcurrencyStats() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proactive cleanup every 60s
|
// Proactive cleanup every 60s
|
||||||
setInterval(cleanupExpiredEntries, 60_000);
|
setInterval(cleanupExpiredEntries, 60_000);
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,13 @@ const PRO_TIER_LIMIT = 5000;
|
||||||
// In-memory cache, periodically synced to PostgreSQL
|
// In-memory cache, periodically synced to PostgreSQL
|
||||||
let usage = new Map<string, { count: number; monthKey: string }>();
|
let usage = new Map<string, { count: number; monthKey: string }>();
|
||||||
|
|
||||||
|
// Write-behind buffer for batching DB writes (Audit #10)
|
||||||
|
const dirtyKeys = new Set<string>();
|
||||||
|
const retryCount = new Map<string, number>();
|
||||||
|
const MAX_RETRIES = 3;
|
||||||
|
const FLUSH_INTERVAL_MS = 5000;
|
||||||
|
const FLUSH_THRESHOLD = 50;
|
||||||
|
|
||||||
function getMonthKey(): string {
|
function getMonthKey(): string {
|
||||||
const d = new Date();
|
const d = new Date();
|
||||||
return `${d.getFullYear()}-${String(d.getMonth() + 1).padStart(2, "0")}`;
|
return `${d.getFullYear()}-${String(d.getMonth() + 1).padStart(2, "0")}`;
|
||||||
|
|
@ -27,18 +34,56 @@ export async function loadUsageData(): Promise<void> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function saveUsageEntry(key: string, record: { count: number; monthKey: string }): Promise<void> {
|
// Batch flush dirty entries to DB (Audit #10 + #12)
|
||||||
|
async function flushDirtyEntries(): Promise<void> {
|
||||||
|
if (dirtyKeys.size === 0) return;
|
||||||
|
|
||||||
|
const keysToFlush = [...dirtyKeys];
|
||||||
|
|
||||||
|
const client = await pool.connect();
|
||||||
try {
|
try {
|
||||||
await pool.query(
|
await client.query("BEGIN");
|
||||||
`INSERT INTO usage (key, count, month_key) VALUES ($1, $2, $3)
|
for (const key of keysToFlush) {
|
||||||
ON CONFLICT (key) DO UPDATE SET count = $2, month_key = $3`,
|
const record = usage.get(key);
|
||||||
[key, record.count, record.monthKey]
|
if (!record) continue;
|
||||||
);
|
try {
|
||||||
|
await client.query(
|
||||||
|
`INSERT INTO usage (key, count, month_key) VALUES ($1, $2, $3)
|
||||||
|
ON CONFLICT (key) DO UPDATE SET count = $2, month_key = $3`,
|
||||||
|
[key, record.count, record.monthKey]
|
||||||
|
);
|
||||||
|
dirtyKeys.delete(key);
|
||||||
|
retryCount.delete(key);
|
||||||
|
} catch (error) {
|
||||||
|
// Audit #12: retry logic for failed writes
|
||||||
|
const retries = (retryCount.get(key) || 0) + 1;
|
||||||
|
if (retries >= MAX_RETRIES) {
|
||||||
|
logger.error({ key: key.slice(0, 8) + "...", retries }, "CRITICAL: Usage write failed after max retries, data may diverge");
|
||||||
|
dirtyKeys.delete(key);
|
||||||
|
retryCount.delete(key);
|
||||||
|
} else {
|
||||||
|
retryCount.set(key, retries);
|
||||||
|
logger.warn({ key: key.slice(0, 8) + "...", retries }, "Usage write failed, will retry");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await client.query("COMMIT");
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error({ err: error }, "Failed to save usage data");
|
await client.query("ROLLBACK").catch(() => {});
|
||||||
|
logger.error({ err: error }, "Failed to flush usage batch");
|
||||||
|
// Keep all keys dirty for retry
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Periodic flush
|
||||||
|
setInterval(flushDirtyEntries, FLUSH_INTERVAL_MS);
|
||||||
|
|
||||||
|
// Flush on process exit
|
||||||
|
process.on("SIGTERM", () => { flushDirtyEntries().catch(() => {}); });
|
||||||
|
process.on("SIGINT", () => { flushDirtyEntries().catch(() => {}); });
|
||||||
|
|
||||||
export function usageMiddleware(req: any, res: any, next: any): void {
|
export function usageMiddleware(req: any, res: any, next: any): void {
|
||||||
const keyInfo = req.apiKeyInfo;
|
const keyInfo = req.apiKeyInfo;
|
||||||
const key = keyInfo?.key || "unknown";
|
const key = keyInfo?.key || "unknown";
|
||||||
|
|
@ -77,12 +122,15 @@ export function usageMiddleware(req: any, res: any, next: any): void {
|
||||||
function trackUsage(key: string, monthKey: string): void {
|
function trackUsage(key: string, monthKey: string): void {
|
||||||
const record = usage.get(key);
|
const record = usage.get(key);
|
||||||
if (!record || record.monthKey !== monthKey) {
|
if (!record || record.monthKey !== monthKey) {
|
||||||
const newRecord = { count: 1, monthKey };
|
usage.set(key, { count: 1, monthKey });
|
||||||
usage.set(key, newRecord);
|
|
||||||
saveUsageEntry(key, newRecord).catch((err) => logger.error({ err }, "Failed to save usage entry"));
|
|
||||||
} else {
|
} else {
|
||||||
record.count++;
|
record.count++;
|
||||||
saveUsageEntry(key, record).catch((err) => logger.error({ err }, "Failed to save usage entry"));
|
}
|
||||||
|
dirtyKeys.add(key);
|
||||||
|
|
||||||
|
// Flush immediately if threshold reached
|
||||||
|
if (dirtyKeys.size >= FLUSH_THRESHOLD) {
|
||||||
|
flushDirtyEntries().catch((err) => logger.error({ err }, "Threshold flush failed"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue