fix: database connection resilience — retry on transient errors, TCP keepalive, health check timeout
- Enable TCP keepalive on pg.Pool to detect dead connections - Add connectionTimeoutMillis (5s) to prevent hanging on stale connections - Add queryWithRetry() with exponential backoff for transient DB errors - Add connectWithRetry() for transaction-based operations - Detect PgBouncer "no available server" and other transient errors - Health check has 3s timeout and returns 503 on DB failure - All DB operations in keys, verification, usage use retry logic Fixes BUG-075: PgBouncer failover causes permanent pod failures
This commit is contained in:
parent
97744897f0
commit
8d88a9c235
5 changed files with 149 additions and 43 deletions
|
|
@ -1,6 +1,7 @@
|
|||
import { isProKey } from "../services/keys.js";
|
||||
import logger from "../services/logger.js";
|
||||
import pool from "../services/db.js";
|
||||
import { queryWithRetry, connectWithRetry } from "../services/db.js";
|
||||
|
||||
const FREE_TIER_LIMIT = 100;
|
||||
const PRO_TIER_LIMIT = 5000;
|
||||
|
|
@ -22,7 +23,7 @@ function getMonthKey(): string {
|
|||
|
||||
export async function loadUsageData(): Promise<void> {
|
||||
try {
|
||||
const result = await pool.query("SELECT key, count, month_key FROM usage");
|
||||
const result = await queryWithRetry("SELECT key, count, month_key FROM usage");
|
||||
usage = new Map();
|
||||
for (const row of result.rows) {
|
||||
usage.set(row.key, { count: row.count, monthKey: row.month_key });
|
||||
|
|
@ -40,7 +41,7 @@ async function flushDirtyEntries(): Promise<void> {
|
|||
|
||||
const keysToFlush = [...dirtyKeys];
|
||||
|
||||
const client = await pool.connect();
|
||||
const client = await connectWithRetry();
|
||||
try {
|
||||
await client.query("BEGIN");
|
||||
for (const key of keysToFlush) {
|
||||
|
|
|
|||
|
|
@ -8,29 +8,34 @@ const { version: APP_VERSION } = require("../../package.json");
|
|||
|
||||
export const healthRouter = Router();
|
||||
|
||||
const HEALTH_CHECK_TIMEOUT_MS = 3000;
|
||||
|
||||
healthRouter.get("/", async (_req, res) => {
|
||||
const poolStats = getPoolStats();
|
||||
let databaseStatus: any;
|
||||
let overallStatus = "ok";
|
||||
let httpStatus = 200;
|
||||
|
||||
// Check database connectivity
|
||||
// Check database connectivity with a timeout
|
||||
try {
|
||||
const client = await pool.connect();
|
||||
try {
|
||||
const result = await client.query('SELECT version()');
|
||||
const version = result.rows[0]?.version || 'Unknown';
|
||||
// Extract just the PostgreSQL version number (e.g., "PostgreSQL 15.4")
|
||||
const versionMatch = version.match(/PostgreSQL ([\d.]+)/);
|
||||
const shortVersion = versionMatch ? `PostgreSQL ${versionMatch[1]}` : 'PostgreSQL';
|
||||
|
||||
databaseStatus = {
|
||||
status: "ok",
|
||||
version: shortVersion
|
||||
};
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
const dbCheck = async () => {
|
||||
const client = await pool.connect();
|
||||
try {
|
||||
const result = await client.query('SELECT version()');
|
||||
const version = result.rows[0]?.version || 'Unknown';
|
||||
const versionMatch = version.match(/PostgreSQL ([\d.]+)/);
|
||||
const shortVersion = versionMatch ? `PostgreSQL ${versionMatch[1]}` : 'PostgreSQL';
|
||||
return { status: "ok", version: shortVersion };
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
};
|
||||
|
||||
const timeout = new Promise<never>((_resolve, reject) =>
|
||||
setTimeout(() => reject(new Error("Database health check timed out")), HEALTH_CHECK_TIMEOUT_MS)
|
||||
);
|
||||
|
||||
databaseStatus = await Promise.race([dbCheck(), timeout]);
|
||||
} catch (error: any) {
|
||||
databaseStatus = {
|
||||
status: "error",
|
||||
|
|
@ -56,4 +61,4 @@ healthRouter.get("/", async (_req, res) => {
|
|||
};
|
||||
|
||||
res.status(httpStatus).json(response);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -3,6 +3,21 @@ import pg from "pg";
|
|||
import logger from "./logger.js";
|
||||
const { Pool } = pg;
|
||||
|
||||
// Transient error codes from PgBouncer / PostgreSQL that warrant retry
|
||||
const TRANSIENT_ERRORS = new Set([
|
||||
"ECONNRESET",
|
||||
"ECONNREFUSED",
|
||||
"EPIPE",
|
||||
"ETIMEDOUT",
|
||||
"CONNECTION_LOST",
|
||||
"57P01", // admin_shutdown
|
||||
"57P02", // crash_shutdown
|
||||
"57P03", // cannot_connect_now
|
||||
"08006", // connection_failure
|
||||
"08003", // connection_does_not_exist
|
||||
"08001", // sqlclient_unable_to_establish_sqlconnection
|
||||
]);
|
||||
|
||||
const pool = new Pool({
|
||||
host: process.env.DATABASE_HOST || "172.17.0.1",
|
||||
port: parseInt(process.env.DATABASE_PORT || "5432", 10),
|
||||
|
|
@ -11,14 +26,97 @@ const pool = new Pool({
|
|||
password: process.env.DATABASE_PASSWORD || "docfast",
|
||||
max: 10,
|
||||
idleTimeoutMillis: 30000,
|
||||
connectionTimeoutMillis: 5000, // Don't wait forever for a connection
|
||||
allowExitOnIdle: false,
|
||||
keepAlive: true, // TCP keepalive to detect dead connections
|
||||
keepAliveInitialDelayMillis: 10000, // Start keepalive probes after 10s idle
|
||||
});
|
||||
|
||||
pool.on("error", (err) => {
|
||||
logger.error({ err }, "Unexpected PostgreSQL pool error");
|
||||
logger.error({ err }, "Unexpected PostgreSQL pool error — connection will be removed from pool");
|
||||
});
|
||||
|
||||
/**
|
||||
* Determine if an error is transient (PgBouncer failover, network blip)
|
||||
*/
|
||||
export function isTransientError(err: any): boolean {
|
||||
if (!err) return false;
|
||||
const code = err.code || "";
|
||||
const msg = (err.message || "").toLowerCase();
|
||||
|
||||
if (TRANSIENT_ERRORS.has(code)) return true;
|
||||
if (msg.includes("no available server")) return true; // PgBouncer specific
|
||||
if (msg.includes("connection terminated")) return true;
|
||||
if (msg.includes("connection refused")) return true;
|
||||
if (msg.includes("server closed the connection")) return true;
|
||||
if (msg.includes("timeout expired")) return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a query with automatic retry on transient errors.
|
||||
* Retries up to `maxRetries` times with exponential backoff.
|
||||
*/
|
||||
export async function queryWithRetry(
|
||||
queryText: string,
|
||||
params?: any[],
|
||||
maxRetries = 3
|
||||
): Promise<pg.QueryResult> {
|
||||
let lastError: any;
|
||||
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
return await pool.query(queryText, params);
|
||||
} catch (err: any) {
|
||||
lastError = err;
|
||||
|
||||
if (!isTransientError(err) || attempt === maxRetries) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); // 1s, 2s, 4s (capped at 5s)
|
||||
logger.warn(
|
||||
{ err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs },
|
||||
"Transient DB error, retrying..."
|
||||
);
|
||||
await new Promise(resolve => setTimeout(resolve, delayMs));
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect with retry — for operations that need a client (transactions).
|
||||
*/
|
||||
export async function connectWithRetry(maxRetries = 3): Promise<pg.PoolClient> {
|
||||
let lastError: any;
|
||||
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
return await pool.connect();
|
||||
} catch (err: any) {
|
||||
lastError = err;
|
||||
|
||||
if (!isTransientError(err) || attempt === maxRetries) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000);
|
||||
logger.warn(
|
||||
{ err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs },
|
||||
"Transient DB connect error, retrying..."
|
||||
);
|
||||
await new Promise(resolve => setTimeout(resolve, delayMs));
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
export async function initDatabase(): Promise<void> {
|
||||
const client = await pool.connect();
|
||||
const client = await connectWithRetry();
|
||||
try {
|
||||
await client.query(`
|
||||
CREATE TABLE IF NOT EXISTS api_keys (
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { randomBytes } from "crypto";
|
||||
import logger from "./logger.js";
|
||||
import pool from "./db.js";
|
||||
import { queryWithRetry, connectWithRetry } from "./db.js";
|
||||
|
||||
export interface ApiKey {
|
||||
key: string;
|
||||
|
|
@ -15,7 +16,7 @@ let keysCache: ApiKey[] = [];
|
|||
|
||||
export async function loadKeys(): Promise<void> {
|
||||
try {
|
||||
const result = await pool.query(
|
||||
const result = await queryWithRetry(
|
||||
"SELECT key, tier, email, created_at, stripe_customer_id FROM api_keys"
|
||||
);
|
||||
keysCache = result.rows.map((r) => ({
|
||||
|
|
@ -37,7 +38,7 @@ export async function loadKeys(): Promise<void> {
|
|||
const entry: ApiKey = { key: k, tier: "pro", email: "seed@docfast.dev", createdAt: new Date().toISOString() };
|
||||
keysCache.push(entry);
|
||||
// Upsert into DB
|
||||
await pool.query(
|
||||
await queryWithRetry(
|
||||
`INSERT INTO api_keys (key, tier, email, created_at) VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (key) DO NOTHING`,
|
||||
[k, "pro", "seed@docfast.dev", new Date().toISOString()]
|
||||
|
|
@ -76,7 +77,7 @@ export async function createFreeKey(email?: string): Promise<ApiKey> {
|
|||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await pool.query(
|
||||
await queryWithRetry(
|
||||
"INSERT INTO api_keys (key, tier, email, created_at) VALUES ($1, $2, $3, $4)",
|
||||
[entry.key, entry.tier, entry.email, entry.createdAt]
|
||||
);
|
||||
|
|
@ -88,7 +89,7 @@ export async function createProKey(email: string, stripeCustomerId: string): Pro
|
|||
const existing = keysCache.find((k) => k.stripeCustomerId === stripeCustomerId);
|
||||
if (existing) {
|
||||
existing.tier = "pro";
|
||||
await pool.query("UPDATE api_keys SET tier = 'pro' WHERE key = $1", [existing.key]);
|
||||
await queryWithRetry("UPDATE api_keys SET tier = 'pro' WHERE key = $1", [existing.key]);
|
||||
return existing;
|
||||
}
|
||||
|
||||
|
|
@ -100,7 +101,7 @@ export async function createProKey(email: string, stripeCustomerId: string): Pro
|
|||
stripeCustomerId,
|
||||
};
|
||||
|
||||
await pool.query(
|
||||
await queryWithRetry(
|
||||
"INSERT INTO api_keys (key, tier, email, created_at, stripe_customer_id) VALUES ($1, $2, $3, $4, $5)",
|
||||
[entry.key, entry.tier, entry.email, entry.createdAt, entry.stripeCustomerId]
|
||||
);
|
||||
|
|
@ -112,7 +113,7 @@ export async function downgradeByCustomer(stripeCustomerId: string): Promise<boo
|
|||
const entry = keysCache.find((k) => k.stripeCustomerId === stripeCustomerId);
|
||||
if (entry) {
|
||||
entry.tier = "free";
|
||||
await pool.query("UPDATE api_keys SET tier = 'free' WHERE stripe_customer_id = $1", [stripeCustomerId]);
|
||||
await queryWithRetry("UPDATE api_keys SET tier = 'free' WHERE stripe_customer_id = $1", [stripeCustomerId]);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
@ -126,7 +127,7 @@ export async function updateKeyEmail(apiKey: string, newEmail: string): Promise<
|
|||
const entry = keysCache.find((k) => k.key === apiKey);
|
||||
if (!entry) return false;
|
||||
entry.email = newEmail;
|
||||
await pool.query("UPDATE api_keys SET email = $1 WHERE key = $2", [newEmail, apiKey]);
|
||||
await queryWithRetry("UPDATE api_keys SET email = $1 WHERE key = $2", [newEmail, apiKey]);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -134,6 +135,6 @@ export async function updateEmailByCustomer(stripeCustomerId: string, newEmail:
|
|||
const entry = keysCache.find(k => k.stripeCustomerId === stripeCustomerId);
|
||||
if (!entry) return false;
|
||||
entry.email = newEmail;
|
||||
await pool.query("UPDATE api_keys SET email = $1 WHERE stripe_customer_id = $2", [newEmail, stripeCustomerId]);
|
||||
await queryWithRetry("UPDATE api_keys SET email = $1 WHERE stripe_customer_id = $2", [newEmail, stripeCustomerId]);
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { randomBytes, randomInt, timingSafeEqual } from "crypto";
|
||||
import logger from "./logger.js";
|
||||
import pool from "./db.js";
|
||||
import { queryWithRetry, connectWithRetry } from "./db.js";
|
||||
|
||||
export interface Verification {
|
||||
email: string;
|
||||
|
|
@ -24,7 +25,7 @@ const MAX_ATTEMPTS = 3;
|
|||
|
||||
export async function createVerification(email: string, apiKey: string): Promise<Verification> {
|
||||
// Check for existing unexpired, unverified
|
||||
const existing = await pool.query(
|
||||
const existing = await queryWithRetry(
|
||||
"SELECT * FROM verifications WHERE email = $1 AND verified_at IS NULL AND created_at > NOW() - INTERVAL '24 hours' LIMIT 1",
|
||||
[email]
|
||||
);
|
||||
|
|
@ -34,11 +35,11 @@ export async function createVerification(email: string, apiKey: string): Promise
|
|||
}
|
||||
|
||||
// Remove old unverified
|
||||
await pool.query("DELETE FROM verifications WHERE email = $1 AND verified_at IS NULL", [email]);
|
||||
await queryWithRetry("DELETE FROM verifications WHERE email = $1 AND verified_at IS NULL", [email]);
|
||||
|
||||
const token = randomBytes(32).toString("hex");
|
||||
const now = new Date().toISOString();
|
||||
await pool.query(
|
||||
await queryWithRetry(
|
||||
"INSERT INTO verifications (email, token, api_key, created_at) VALUES ($1, $2, $3, $4)",
|
||||
[email, token, apiKey, now]
|
||||
);
|
||||
|
|
@ -56,7 +57,7 @@ export function verifyToken(token: string): { status: "ok"; verification: Verifi
|
|||
let verificationsCache: Verification[] = [];
|
||||
|
||||
export async function loadVerifications(): Promise<void> {
|
||||
const result = await pool.query("SELECT * FROM verifications");
|
||||
const result = await queryWithRetry("SELECT * FROM verifications");
|
||||
verificationsCache = result.rows.map((r) => ({
|
||||
email: r.email,
|
||||
token: r.token,
|
||||
|
|
@ -85,12 +86,12 @@ function verifyTokenSync(token: string): { status: "ok"; verification: Verificat
|
|||
if (age > TOKEN_EXPIRY_MS) return { status: "expired" };
|
||||
v.verifiedAt = new Date().toISOString();
|
||||
// Update DB async
|
||||
pool.query("UPDATE verifications SET verified_at = $1 WHERE token = $2", [v.verifiedAt, token]).catch((err) => logger.error({ err }, "Failed to update verification"));
|
||||
queryWithRetry("UPDATE verifications SET verified_at = $1 WHERE token = $2", [v.verifiedAt, token]).catch((err) => logger.error({ err }, "Failed to update verification"));
|
||||
return { status: "ok", verification: v };
|
||||
}
|
||||
|
||||
export async function createPendingVerification(email: string): Promise<PendingVerification> {
|
||||
await pool.query("DELETE FROM pending_verifications WHERE email = $1", [email]);
|
||||
await queryWithRetry("DELETE FROM pending_verifications WHERE email = $1", [email]);
|
||||
|
||||
const now = new Date();
|
||||
const pending: PendingVerification = {
|
||||
|
|
@ -101,7 +102,7 @@ export async function createPendingVerification(email: string): Promise<PendingV
|
|||
attempts: 0,
|
||||
};
|
||||
|
||||
await pool.query(
|
||||
await queryWithRetry(
|
||||
"INSERT INTO pending_verifications (email, code, created_at, expires_at, attempts) VALUES ($1, $2, $3, $4, $5)",
|
||||
[pending.email, pending.code, pending.createdAt, pending.expiresAt, pending.attempts]
|
||||
);
|
||||
|
|
@ -110,34 +111,34 @@ export async function createPendingVerification(email: string): Promise<PendingV
|
|||
|
||||
export async function verifyCode(email: string, code: string): Promise<{ status: "ok" | "invalid" | "expired" | "max_attempts" }> {
|
||||
const cleanEmail = email.trim().toLowerCase();
|
||||
const result = await pool.query("SELECT * FROM pending_verifications WHERE email = $1", [cleanEmail]);
|
||||
const result = await queryWithRetry("SELECT * FROM pending_verifications WHERE email = $1", [cleanEmail]);
|
||||
const pending = result.rows[0];
|
||||
|
||||
if (!pending) return { status: "invalid" };
|
||||
|
||||
if (new Date() > new Date(pending.expires_at)) {
|
||||
await pool.query("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]);
|
||||
await queryWithRetry("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]);
|
||||
return { status: "expired" };
|
||||
}
|
||||
|
||||
if (pending.attempts >= MAX_ATTEMPTS) {
|
||||
await pool.query("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]);
|
||||
await queryWithRetry("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]);
|
||||
return { status: "max_attempts" };
|
||||
}
|
||||
|
||||
await pool.query("UPDATE pending_verifications SET attempts = attempts + 1 WHERE email = $1", [cleanEmail]);
|
||||
await queryWithRetry("UPDATE pending_verifications SET attempts = attempts + 1 WHERE email = $1", [cleanEmail]);
|
||||
|
||||
const a = Buffer.from(pending.code, "utf8"); const b = Buffer.from(code, "utf8"); const codeMatch = a.length === b.length && timingSafeEqual(a, b);
|
||||
if (!codeMatch) {
|
||||
return { status: "invalid" };
|
||||
}
|
||||
|
||||
await pool.query("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]);
|
||||
await queryWithRetry("DELETE FROM pending_verifications WHERE email = $1", [cleanEmail]);
|
||||
return { status: "ok" };
|
||||
}
|
||||
|
||||
export async function isEmailVerified(email: string): Promise<boolean> {
|
||||
const result = await pool.query(
|
||||
const result = await queryWithRetry(
|
||||
"SELECT 1 FROM verifications WHERE email = $1 AND verified_at IS NOT NULL LIMIT 1",
|
||||
[email]
|
||||
);
|
||||
|
|
@ -145,7 +146,7 @@ export async function isEmailVerified(email: string): Promise<boolean> {
|
|||
}
|
||||
|
||||
export async function getVerifiedApiKey(email: string): Promise<string | null> {
|
||||
const result = await pool.query(
|
||||
const result = await queryWithRetry(
|
||||
"SELECT api_key FROM verifications WHERE email = $1 AND verified_at IS NOT NULL LIMIT 1",
|
||||
[email]
|
||||
);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue