import pg from "pg"; import logger from "./logger.js"; import { isTransientError } from "../utils/errors.js"; const { Pool } = pg; const pool = new Pool({ host: process.env.DATABASE_HOST || "172.17.0.1", port: parseInt(process.env.DATABASE_PORT || "5432", 10), database: process.env.DATABASE_NAME || "docfast", user: process.env.DATABASE_USER || "docfast", password: process.env.DATABASE_PASSWORD || "docfast", max: 10, idleTimeoutMillis: 10000, // Evict idle connections after 10s (was 30s) — faster cleanup of stale sockets connectionTimeoutMillis: 5000, // Don't wait forever for a connection allowExitOnIdle: false, keepAlive: true, // TCP keepalive to detect dead connections keepAliveInitialDelayMillis: 10000, // Start keepalive probes after 10s idle }); // Handle errors on idle clients — pg.Pool automatically removes the client // after emitting this event, so we just log it. pool.on("error", (err, client) => { logger.error({ err }, "Unexpected error on idle PostgreSQL client — evicted from pool"); }); export { isTransientError } from "../utils/errors.js"; /** * Execute a query with automatic retry on transient errors. * * KEY FIX: On transient error, we destroy the bad connection (client.release(true)) * so the pool creates a fresh TCP connection on the next attempt, instead of * reusing a dead socket from the pool. */ export async function queryWithRetry(queryText, params, maxRetries = 3) { let lastError; for (let attempt = 0; attempt <= maxRetries; attempt++) { let client; try { client = await pool.connect(); const result = await client.query(queryText, params); client.release(); // Return healthy connection to pool return result; } catch (err) { // Destroy the bad connection so pool doesn't reuse it if (client) { try { client.release(true); } catch (_) { /* already destroyed */ } } lastError = err; if (!isTransientError(err) || attempt === maxRetries) { throw err; } const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); // 1s, 2s, 4s (capped at 5s) logger.warn({ err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs }, "Transient DB error, destroying bad connection and retrying..."); await new Promise(resolve => setTimeout(resolve, delayMs)); } } throw lastError; } /** * Connect with retry — for operations that need a client (transactions). * On transient connect errors, waits and retries so the pool can establish * fresh connections to the new PgBouncer pod. */ export async function connectWithRetry(maxRetries = 3) { let lastError; for (let attempt = 0; attempt <= maxRetries; attempt++) { try { const client = await pool.connect(); // Validate the connection is actually alive try { await client.query("SELECT 1"); } catch (validationErr) { // Connection is dead — destroy it and retry try { client.release(true); } catch (_) { } if (!isTransientError(validationErr) || attempt === maxRetries) { throw validationErr; } const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); logger.warn({ err: validationErr.message, code: validationErr.code, attempt: attempt + 1 }, "Connection validation failed, destroying and retrying..."); await new Promise(resolve => setTimeout(resolve, delayMs)); continue; } return client; } catch (err) { lastError = err; if (!isTransientError(err) || attempt === maxRetries) { throw err; } const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); logger.warn({ err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs }, "Transient DB connect error, retrying..."); await new Promise(resolve => setTimeout(resolve, delayMs)); } } throw lastError; } export async function initDatabase() { const client = await connectWithRetry(); try { await client.query(` CREATE TABLE IF NOT EXISTS api_keys ( key TEXT PRIMARY KEY, tier TEXT NOT NULL DEFAULT 'free', email TEXT NOT NULL DEFAULT '', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), stripe_customer_id TEXT ); CREATE INDEX IF NOT EXISTS idx_api_keys_email ON api_keys(email); CREATE INDEX IF NOT EXISTS idx_api_keys_stripe ON api_keys(stripe_customer_id); CREATE UNIQUE INDEX IF NOT EXISTS idx_api_keys_stripe_unique ON api_keys(stripe_customer_id) WHERE stripe_customer_id IS NOT NULL; CREATE TABLE IF NOT EXISTS verifications ( id SERIAL PRIMARY KEY, email TEXT NOT NULL, token TEXT NOT NULL UNIQUE, api_key TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), verified_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_verifications_email ON verifications(email); CREATE INDEX IF NOT EXISTS idx_verifications_token ON verifications(token); CREATE TABLE IF NOT EXISTS pending_verifications ( email TEXT PRIMARY KEY, code TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), expires_at TIMESTAMPTZ NOT NULL, attempts INT NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS usage ( key TEXT PRIMARY KEY, count INT NOT NULL DEFAULT 0, month_key TEXT NOT NULL ); `); logger.info("PostgreSQL tables initialized"); } finally { client.release(); } } /** * Clean up stale database entries: * - Expired pending verifications * - Unverified free-tier API keys (never completed verification) * - Orphaned usage rows (key no longer exists) */ export async function cleanupStaleData() { const results = { expiredVerifications: 0, staleKeys: 0, orphanedUsage: 0 }; // 1. Delete expired pending verifications const pv = await queryWithRetry("DELETE FROM pending_verifications WHERE expires_at < NOW() RETURNING email"); results.expiredVerifications = pv.rowCount || 0; // 2. Delete unverified free-tier keys (email not in verified verifications) const sk = await queryWithRetry(` DELETE FROM api_keys WHERE tier = 'free' AND email NOT IN ( SELECT DISTINCT email FROM verifications WHERE verified_at IS NOT NULL ) RETURNING key `); results.staleKeys = sk.rowCount || 0; // 3. Delete orphaned usage rows const ou = await queryWithRetry(` DELETE FROM usage WHERE key NOT IN (SELECT key FROM api_keys) RETURNING key `); results.orphanedUsage = ou.rowCount || 0; logger.info({ ...results }, `Database cleanup complete: ${results.expiredVerifications} expired verifications, ${results.staleKeys} stale keys, ${results.orphanedUsage} orphaned usage rows removed`); return results; } export { pool }; export default pool;