All checks were successful
Build & Deploy to Staging / Build & Deploy to Staging (push) Successful in 16m15s
181 lines
7.2 KiB
JavaScript
181 lines
7.2 KiB
JavaScript
import pg from "pg";
|
|
import logger from "./logger.js";
|
|
import { isTransientError } from "../utils/errors.js";
|
|
const { Pool } = pg;
|
|
const pool = new Pool({
|
|
host: process.env.DATABASE_HOST || "172.17.0.1",
|
|
port: parseInt(process.env.DATABASE_PORT || "5432", 10),
|
|
database: process.env.DATABASE_NAME || "docfast",
|
|
user: process.env.DATABASE_USER || "docfast",
|
|
password: process.env.DATABASE_PASSWORD || "docfast",
|
|
max: 10,
|
|
idleTimeoutMillis: 10000, // Evict idle connections after 10s (was 30s) — faster cleanup of stale sockets
|
|
connectionTimeoutMillis: 5000, // Don't wait forever for a connection
|
|
allowExitOnIdle: false,
|
|
keepAlive: true, // TCP keepalive to detect dead connections
|
|
keepAliveInitialDelayMillis: 10000, // Start keepalive probes after 10s idle
|
|
});
|
|
// Handle errors on idle clients — pg.Pool automatically removes the client
|
|
// after emitting this event, so we just log it.
|
|
pool.on("error", (err, client) => {
|
|
logger.error({ err }, "Unexpected error on idle PostgreSQL client — evicted from pool");
|
|
});
|
|
export { isTransientError } from "../utils/errors.js";
|
|
/**
|
|
* Execute a query with automatic retry on transient errors.
|
|
*
|
|
* KEY FIX: On transient error, we destroy the bad connection (client.release(true))
|
|
* so the pool creates a fresh TCP connection on the next attempt, instead of
|
|
* reusing a dead socket from the pool.
|
|
*/
|
|
export async function queryWithRetry(queryText, params, maxRetries = 3) {
|
|
let lastError;
|
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
|
let client;
|
|
try {
|
|
client = await pool.connect();
|
|
const result = await client.query(queryText, params);
|
|
client.release(); // Return healthy connection to pool
|
|
return result;
|
|
}
|
|
catch (err) {
|
|
// Destroy the bad connection so pool doesn't reuse it
|
|
if (client) {
|
|
try {
|
|
client.release(true);
|
|
}
|
|
catch (_) { /* already destroyed */ }
|
|
}
|
|
lastError = err;
|
|
if (!isTransientError(err) || attempt === maxRetries) {
|
|
throw err;
|
|
}
|
|
const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); // 1s, 2s, 4s (capped at 5s)
|
|
logger.warn({ err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs }, "Transient DB error, destroying bad connection and retrying...");
|
|
await new Promise(resolve => setTimeout(resolve, delayMs));
|
|
}
|
|
}
|
|
throw lastError;
|
|
}
|
|
/**
|
|
* Connect with retry — for operations that need a client (transactions).
|
|
* On transient connect errors, waits and retries so the pool can establish
|
|
* fresh connections to the new PgBouncer pod.
|
|
*/
|
|
export async function connectWithRetry(maxRetries = 3) {
|
|
let lastError;
|
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
|
try {
|
|
const client = await pool.connect();
|
|
// Validate the connection is actually alive
|
|
try {
|
|
await client.query("SELECT 1");
|
|
}
|
|
catch (validationErr) {
|
|
// Connection is dead — destroy it and retry
|
|
try {
|
|
client.release(true);
|
|
}
|
|
catch (_) { }
|
|
if (!isTransientError(validationErr) || attempt === maxRetries) {
|
|
throw validationErr;
|
|
}
|
|
const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000);
|
|
logger.warn({ err: validationErr.message, code: validationErr.code, attempt: attempt + 1 }, "Connection validation failed, destroying and retrying...");
|
|
await new Promise(resolve => setTimeout(resolve, delayMs));
|
|
continue;
|
|
}
|
|
return client;
|
|
}
|
|
catch (err) {
|
|
lastError = err;
|
|
if (!isTransientError(err) || attempt === maxRetries) {
|
|
throw err;
|
|
}
|
|
const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000);
|
|
logger.warn({ err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs }, "Transient DB connect error, retrying...");
|
|
await new Promise(resolve => setTimeout(resolve, delayMs));
|
|
}
|
|
}
|
|
throw lastError;
|
|
}
|
|
export async function initDatabase() {
|
|
const client = await connectWithRetry();
|
|
try {
|
|
await client.query(`
|
|
CREATE TABLE IF NOT EXISTS api_keys (
|
|
key TEXT PRIMARY KEY,
|
|
tier TEXT NOT NULL DEFAULT 'free',
|
|
email TEXT NOT NULL DEFAULT '',
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
stripe_customer_id TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_api_keys_email ON api_keys(email);
|
|
CREATE INDEX IF NOT EXISTS idx_api_keys_stripe ON api_keys(stripe_customer_id);
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_api_keys_stripe_unique
|
|
ON api_keys(stripe_customer_id) WHERE stripe_customer_id IS NOT NULL;
|
|
|
|
CREATE TABLE IF NOT EXISTS verifications (
|
|
id SERIAL PRIMARY KEY,
|
|
email TEXT NOT NULL,
|
|
token TEXT NOT NULL UNIQUE,
|
|
api_key TEXT NOT NULL,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
verified_at TIMESTAMPTZ
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_verifications_email ON verifications(email);
|
|
CREATE INDEX IF NOT EXISTS idx_verifications_token ON verifications(token);
|
|
|
|
CREATE TABLE IF NOT EXISTS pending_verifications (
|
|
email TEXT PRIMARY KEY,
|
|
code TEXT NOT NULL,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
expires_at TIMESTAMPTZ NOT NULL,
|
|
attempts INT NOT NULL DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS usage (
|
|
key TEXT PRIMARY KEY,
|
|
count INT NOT NULL DEFAULT 0,
|
|
month_key TEXT NOT NULL
|
|
);
|
|
`);
|
|
logger.info("PostgreSQL tables initialized");
|
|
}
|
|
finally {
|
|
client.release();
|
|
}
|
|
}
|
|
/**
|
|
* Clean up stale database entries:
|
|
* - Expired pending verifications
|
|
* - Unverified free-tier API keys (never completed verification)
|
|
* - Orphaned usage rows (key no longer exists)
|
|
*/
|
|
export async function cleanupStaleData() {
|
|
const results = { expiredVerifications: 0, staleKeys: 0, orphanedUsage: 0 };
|
|
// 1. Delete expired pending verifications
|
|
const pv = await queryWithRetry("DELETE FROM pending_verifications WHERE expires_at < NOW() RETURNING email");
|
|
results.expiredVerifications = pv.rowCount || 0;
|
|
// 2. Delete unverified free-tier keys (email not in verified verifications)
|
|
const sk = await queryWithRetry(`
|
|
DELETE FROM api_keys
|
|
WHERE tier = 'free'
|
|
AND email NOT IN (
|
|
SELECT DISTINCT email FROM verifications WHERE verified_at IS NOT NULL
|
|
)
|
|
RETURNING key
|
|
`);
|
|
results.staleKeys = sk.rowCount || 0;
|
|
// 3. Delete orphaned usage rows
|
|
const ou = await queryWithRetry(`
|
|
DELETE FROM usage
|
|
WHERE key NOT IN (SELECT key FROM api_keys)
|
|
RETURNING key
|
|
`);
|
|
results.orphanedUsage = ou.rowCount || 0;
|
|
logger.info({ ...results }, `Database cleanup complete: ${results.expiredVerifications} expired verifications, ${results.staleKeys} stale keys, ${results.orphanedUsage} orphaned usage rows removed`);
|
|
return results;
|
|
}
|
|
export { pool };
|
|
export default pool;
|