diff --git a/src/middleware/usage.ts b/src/middleware/usage.ts index 79a164f..df11d79 100644 --- a/src/middleware/usage.ts +++ b/src/middleware/usage.ts @@ -1,6 +1,7 @@ import { isProKey } from "../services/keys.js"; import logger from "../services/logger.js"; import pool from "../services/db.js"; +import { queryWithRetry, connectWithRetry } from "../services/db.js"; const FREE_TIER_LIMIT = 100; const PRO_TIER_LIMIT = 5000; @@ -22,7 +23,7 @@ function getMonthKey(): string { export async function loadUsageData(): Promise { try { - const result = await pool.query("SELECT key, count, month_key FROM usage"); + const result = await queryWithRetry("SELECT key, count, month_key FROM usage"); usage = new Map(); for (const row of result.rows) { usage.set(row.key, { count: row.count, monthKey: row.month_key }); @@ -40,7 +41,7 @@ async function flushDirtyEntries(): Promise { const keysToFlush = [...dirtyKeys]; - const client = await pool.connect(); + const client = await connectWithRetry(); try { await client.query("BEGIN"); for (const key of keysToFlush) { diff --git a/src/routes/health.ts b/src/routes/health.ts index 82079e5..90eec04 100644 --- a/src/routes/health.ts +++ b/src/routes/health.ts @@ -8,29 +8,34 @@ const { version: APP_VERSION } = require("../../package.json"); export const healthRouter = Router(); +const HEALTH_CHECK_TIMEOUT_MS = 3000; + healthRouter.get("/", async (_req, res) => { const poolStats = getPoolStats(); let databaseStatus: any; let overallStatus = "ok"; let httpStatus = 200; - // Check database connectivity + // Check database connectivity with a timeout try { - const client = await pool.connect(); - try { - const result = await client.query('SELECT version()'); - const version = result.rows[0]?.version || 'Unknown'; - // Extract just the PostgreSQL version number (e.g., "PostgreSQL 15.4") - const versionMatch = version.match(/PostgreSQL ([\d.]+)/); - const shortVersion = versionMatch ? `PostgreSQL ${versionMatch[1]}` : 'PostgreSQL'; - - databaseStatus = { - status: "ok", - version: shortVersion - }; - } finally { - client.release(); - } + const dbCheck = async () => { + const client = await pool.connect(); + try { + const result = await client.query('SELECT version()'); + const version = result.rows[0]?.version || 'Unknown'; + const versionMatch = version.match(/PostgreSQL ([\d.]+)/); + const shortVersion = versionMatch ? `PostgreSQL ${versionMatch[1]}` : 'PostgreSQL'; + return { status: "ok", version: shortVersion }; + } finally { + client.release(); + } + }; + + const timeout = new Promise((_resolve, reject) => + setTimeout(() => reject(new Error("Database health check timed out")), HEALTH_CHECK_TIMEOUT_MS) + ); + + databaseStatus = await Promise.race([dbCheck(), timeout]); } catch (error: any) { databaseStatus = { status: "error", @@ -56,4 +61,4 @@ healthRouter.get("/", async (_req, res) => { }; res.status(httpStatus).json(response); -}); \ No newline at end of file +}); diff --git a/src/services/db.ts b/src/services/db.ts index 48151ad..ea9864c 100644 --- a/src/services/db.ts +++ b/src/services/db.ts @@ -3,6 +3,21 @@ import pg from "pg"; import logger from "./logger.js"; const { Pool } = pg; +// Transient error codes from PgBouncer / PostgreSQL that warrant retry +const TRANSIENT_ERRORS = new Set([ + "ECONNRESET", + "ECONNREFUSED", + "EPIPE", + "ETIMEDOUT", + "CONNECTION_LOST", + "57P01", // admin_shutdown + "57P02", // crash_shutdown + "57P03", // cannot_connect_now + "08006", // connection_failure + "08003", // connection_does_not_exist + "08001", // sqlclient_unable_to_establish_sqlconnection +]); + const pool = new Pool({ host: process.env.DATABASE_HOST || "172.17.0.1", port: parseInt(process.env.DATABASE_PORT || "5432", 10), @@ -11,14 +26,97 @@ const pool = new Pool({ password: process.env.DATABASE_PASSWORD || "docfast", max: 10, idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, // Don't wait forever for a connection + allowExitOnIdle: false, + keepAlive: true, // TCP keepalive to detect dead connections + keepAliveInitialDelayMillis: 10000, // Start keepalive probes after 10s idle }); pool.on("error", (err) => { - logger.error({ err }, "Unexpected PostgreSQL pool error"); + logger.error({ err }, "Unexpected PostgreSQL pool error — connection will be removed from pool"); }); +/** + * Determine if an error is transient (PgBouncer failover, network blip) + */ +export function isTransientError(err: any): boolean { + if (!err) return false; + const code = err.code || ""; + const msg = (err.message || "").toLowerCase(); + + if (TRANSIENT_ERRORS.has(code)) return true; + if (msg.includes("no available server")) return true; // PgBouncer specific + if (msg.includes("connection terminated")) return true; + if (msg.includes("connection refused")) return true; + if (msg.includes("server closed the connection")) return true; + if (msg.includes("timeout expired")) return true; + + return false; +} + +/** + * Execute a query with automatic retry on transient errors. + * Retries up to `maxRetries` times with exponential backoff. + */ +export async function queryWithRetry( + queryText: string, + params?: any[], + maxRetries = 3 +): Promise { + let lastError: any; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await pool.query(queryText, params); + } catch (err: any) { + lastError = err; + + if (!isTransientError(err) || attempt === maxRetries) { + throw err; + } + + const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); // 1s, 2s, 4s (capped at 5s) + logger.warn( + { err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs }, + "Transient DB error, retrying..." + ); + await new Promise(resolve => setTimeout(resolve, delayMs)); + } + } + + throw lastError; +} + +/** + * Connect with retry — for operations that need a client (transactions). + */ +export async function connectWithRetry(maxRetries = 3): Promise { + let lastError: any; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await pool.connect(); + } catch (err: any) { + lastError = err; + + if (!isTransientError(err) || attempt === maxRetries) { + throw err; + } + + const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); + logger.warn( + { err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs }, + "Transient DB connect error, retrying..." + ); + await new Promise(resolve => setTimeout(resolve, delayMs)); + } + } + + throw lastError; +} + export async function initDatabase(): Promise { - const client = await pool.connect(); + const client = await connectWithRetry(); try { await client.query(` CREATE TABLE IF NOT EXISTS api_keys ( diff --git a/src/services/keys.ts b/src/services/keys.ts index 2fc3fb2..e0f3b10 100644 --- a/src/services/keys.ts +++ b/src/services/keys.ts @@ -1,6 +1,7 @@ import { randomBytes } from "crypto"; import logger from "./logger.js"; import pool from "./db.js"; +import { queryWithRetry, connectWithRetry } from "./db.js"; export interface ApiKey { key: string; @@ -15,7 +16,7 @@ let keysCache: ApiKey[] = []; export async function loadKeys(): Promise { try { - const result = await pool.query( + const result = await queryWithRetry( "SELECT key, tier, email, created_at, stripe_customer_id FROM api_keys" ); keysCache = result.rows.map((r) => ({ @@ -37,7 +38,7 @@ export async function loadKeys(): Promise { const entry: ApiKey = { key: k, tier: "pro", email: "seed@docfast.dev", createdAt: new Date().toISOString() }; keysCache.push(entry); // Upsert into DB - await pool.query( + await queryWithRetry( `INSERT INTO api_keys (key, tier, email, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT (key) DO NOTHING`, [k, "pro", "seed@docfast.dev", new Date().toISOString()] @@ -76,7 +77,7 @@ export async function createFreeKey(email?: string): Promise { createdAt: new Date().toISOString(), }; - await pool.query( + await queryWithRetry( "INSERT INTO api_keys (key, tier, email, created_at) VALUES ($1, $2, $3, $4)", [entry.key, entry.tier, entry.email, entry.createdAt] ); @@ -88,7 +89,7 @@ export async function createProKey(email: string, stripeCustomerId: string): Pro const existing = keysCache.find((k) => k.stripeCustomerId === stripeCustomerId); if (existing) { existing.tier = "pro"; - await pool.query("UPDATE api_keys SET tier = 'pro' WHERE key = $1", [existing.key]); + await queryWithRetry("UPDATE api_keys SET tier = 'pro' WHERE key = $1", [existing.key]); return existing; } @@ -100,7 +101,7 @@ export async function createProKey(email: string, stripeCustomerId: string): Pro stripeCustomerId, }; - await pool.query( + await queryWithRetry( "INSERT INTO api_keys (key, tier, email, created_at, stripe_customer_id) VALUES ($1, $2, $3, $4, $5)", [entry.key, entry.tier, entry.email, entry.createdAt, entry.stripeCustomerId] ); @@ -112,7 +113,7 @@ export async function downgradeByCustomer(stripeCustomerId: string): Promise k.stripeCustomerId === stripeCustomerId); if (entry) { entry.tier = "free"; - await pool.query("UPDATE api_keys SET tier = 'free' WHERE stripe_customer_id = $1", [stripeCustomerId]); + await queryWithRetry("UPDATE api_keys SET tier = 'free' WHERE stripe_customer_id = $1", [stripeCustomerId]); return true; } return false; @@ -126,7 +127,7 @@ export async function updateKeyEmail(apiKey: string, newEmail: string): Promise< const entry = keysCache.find((k) => k.key === apiKey); if (!entry) return false; entry.email = newEmail; - await pool.query("UPDATE api_keys SET email = $1 WHERE key = $2", [newEmail, apiKey]); + await queryWithRetry("UPDATE api_keys SET email = $1 WHERE key = $2", [newEmail, apiKey]); return true; } @@ -134,6 +135,6 @@ export async function updateEmailByCustomer(stripeCustomerId: string, newEmail: const entry = keysCache.find(k => k.stripeCustomerId === stripeCustomerId); if (!entry) return false; entry.email = newEmail; - await pool.query("UPDATE api_keys SET email = $1 WHERE stripe_customer_id = $2", [newEmail, stripeCustomerId]); + await queryWithRetry("UPDATE api_keys SET email = $1 WHERE stripe_customer_id = $2", [newEmail, stripeCustomerId]); return true; } diff --git a/src/services/verification.ts b/src/services/verification.ts index b121fbc..19df24f 100644 --- a/src/services/verification.ts +++ b/src/services/verification.ts @@ -1,6 +1,7 @@ import { randomBytes, randomInt, timingSafeEqual } from "crypto"; import logger from "./logger.js"; import pool from "./db.js"; +import { queryWithRetry, connectWithRetry } from "./db.js"; export interface Verification { email: string; @@ -24,7 +25,7 @@ const MAX_ATTEMPTS = 3; export async function createVerification(email: string, apiKey: string): Promise { // Check for existing unexpired, unverified - const existing = await pool.query( + const existing = await queryWithRetry( "SELECT * FROM verifications WHERE email = $1 AND verified_at IS NULL AND created_at > NOW() - INTERVAL '24 hours' LIMIT 1", [email] ); @@ -34,11 +35,11 @@ export async function createVerification(email: string, apiKey: string): Promise } // Remove old unverified - await pool.query("DELETE FROM verifications WHERE email = $1 AND verified_at IS NULL", [email]); + await queryWithRetry("DELETE FROM verifications WHERE email = $1 AND verified_at IS NULL", [email]); const token = randomBytes(32).toString("hex"); const now = new Date().toISOString(); - await pool.query( + await queryWithRetry( "INSERT INTO verifications (email, token, api_key, created_at) VALUES ($1, $2, $3, $4)", [email, token, apiKey, now] ); @@ -56,7 +57,7 @@ export function verifyToken(token: string): { status: "ok"; verification: Verifi let verificationsCache: Verification[] = []; export async function loadVerifications(): Promise { - const result = await pool.query("SELECT * FROM verifications"); + const result = await queryWithRetry("SELECT * FROM verifications"); verificationsCache = result.rows.map((r) => ({ email: r.email, token: r.token, @@ -85,12 +86,12 @@ function verifyTokenSync(token: string): { status: "ok"; verification: Verificat if (age > TOKEN_EXPIRY_MS) return { status: "expired" }; v.verifiedAt = new Date().toISOString(); // Update DB async - pool.query("UPDATE verifications SET verified_at = $1 WHERE token = $2", [v.verifiedAt, token]).catch((err) => logger.error({ err }, "Failed to update verification")); + queryWithRetry("UPDATE verifications SET verified_at = $1 WHERE token = $2", [v.verifiedAt, token]).catch((err) => logger.error({ err }, "Failed to update verification")); return { status: "ok", verification: v }; } export async function createPendingVerification(email: string): Promise { - await pool.query("DELETE FROM pending_verifications WHERE email = $1", [email]); + await queryWithRetry("DELETE FROM pending_verifications WHERE email = $1", [email]); const now = new Date(); const pending: PendingVerification = { @@ -101,7 +102,7 @@ export async function createPendingVerification(email: string): Promise { const cleanEmail = email.trim().toLowerCase(); - const result = await pool.query("SELECT * FROM pending_verifications WHERE email = $1", [cleanEmail]); + const result = await queryWithRetry("SELECT * FROM pending_verifications WHERE email = $1", [cleanEmail]); const pending = result.rows[0]; if (!pending) return { status: "invalid" }; if (new Date() > new Date(pending.expires_at)) { - await pool.query("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]); + await queryWithRetry("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]); return { status: "expired" }; } if (pending.attempts >= MAX_ATTEMPTS) { - await pool.query("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]); + await queryWithRetry("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]); return { status: "max_attempts" }; } - await pool.query("UPDATE pending_verifications SET attempts = attempts + 1 WHERE email = $1", [cleanEmail]); + await queryWithRetry("UPDATE pending_verifications SET attempts = attempts + 1 WHERE email = $1", [cleanEmail]); const a = Buffer.from(pending.code, "utf8"); const b = Buffer.from(code, "utf8"); const codeMatch = a.length === b.length && timingSafeEqual(a, b); if (!codeMatch) { return { status: "invalid" }; } - await pool.query("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]); + await queryWithRetry("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]); return { status: "ok" }; } export async function isEmailVerified(email: string): Promise { - const result = await pool.query( + const result = await queryWithRetry( "SELECT 1 FROM verifications WHERE email = $1 AND verified_at IS NOT NULL LIMIT 1", [email] ); @@ -145,7 +146,7 @@ export async function isEmailVerified(email: string): Promise { } export async function getVerifiedApiKey(email: string): Promise { - const result = await pool.query( + const result = await queryWithRetry( "SELECT api_key FROM verifications WHERE email = $1 AND verified_at IS NOT NULL LIMIT 1", [email] );