fix: destroy dead pool connections on transient errors (proper failover)
- queryWithRetry now uses explicit client checkout; on transient error, calls client.release(true) to DESTROY the dead connection instead of returning it to pool. Fresh connections are created on retry. - connectWithRetry validates connections with SELECT 1 before returning - Health check destroys bad connections on failure - Reduced idleTimeoutMillis from 30s to 10s for faster stale connection eviction - Fixes BUG-075: pool kept reusing dead TCP sockets after PgBouncer pod restart
This commit is contained in:
parent
8d88a9c235
commit
95ca10175f
2 changed files with 52 additions and 13 deletions
|
|
@ -16,18 +16,23 @@ healthRouter.get("/", async (_req, res) => {
|
||||||
let overallStatus = "ok";
|
let overallStatus = "ok";
|
||||||
let httpStatus = 200;
|
let httpStatus = 200;
|
||||||
|
|
||||||
// Check database connectivity with a timeout
|
// Check database connectivity with a real query and timeout
|
||||||
try {
|
try {
|
||||||
const dbCheck = async () => {
|
const dbCheck = async () => {
|
||||||
const client = await pool.connect();
|
const client = await pool.connect();
|
||||||
try {
|
try {
|
||||||
|
// Use SELECT 1 as a lightweight liveness probe
|
||||||
|
await client.query('SELECT 1');
|
||||||
const result = await client.query('SELECT version()');
|
const result = await client.query('SELECT version()');
|
||||||
const version = result.rows[0]?.version || 'Unknown';
|
const version = result.rows[0]?.version || 'Unknown';
|
||||||
const versionMatch = version.match(/PostgreSQL ([\d.]+)/);
|
const versionMatch = version.match(/PostgreSQL ([\d.]+)/);
|
||||||
const shortVersion = versionMatch ? `PostgreSQL ${versionMatch[1]}` : 'PostgreSQL';
|
const shortVersion = versionMatch ? `PostgreSQL ${versionMatch[1]}` : 'PostgreSQL';
|
||||||
return { status: "ok", version: shortVersion };
|
|
||||||
} finally {
|
|
||||||
client.release();
|
client.release();
|
||||||
|
return { status: "ok", version: shortVersion };
|
||||||
|
} catch (queryErr) {
|
||||||
|
// Destroy the bad connection so it doesn't go back to the pool
|
||||||
|
try { client.release(true); } catch (_) {}
|
||||||
|
throw queryErr;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,15 +25,17 @@ const pool = new Pool({
|
||||||
user: process.env.DATABASE_USER || "docfast",
|
user: process.env.DATABASE_USER || "docfast",
|
||||||
password: process.env.DATABASE_PASSWORD || "docfast",
|
password: process.env.DATABASE_PASSWORD || "docfast",
|
||||||
max: 10,
|
max: 10,
|
||||||
idleTimeoutMillis: 30000,
|
idleTimeoutMillis: 10000, // Evict idle connections after 10s (was 30s) — faster cleanup of stale sockets
|
||||||
connectionTimeoutMillis: 5000, // Don't wait forever for a connection
|
connectionTimeoutMillis: 5000, // Don't wait forever for a connection
|
||||||
allowExitOnIdle: false,
|
allowExitOnIdle: false,
|
||||||
keepAlive: true, // TCP keepalive to detect dead connections
|
keepAlive: true, // TCP keepalive to detect dead connections
|
||||||
keepAliveInitialDelayMillis: 10000, // Start keepalive probes after 10s idle
|
keepAliveInitialDelayMillis: 10000, // Start keepalive probes after 10s idle
|
||||||
});
|
});
|
||||||
|
|
||||||
pool.on("error", (err) => {
|
// Handle errors on idle clients — pg.Pool automatically removes the client
|
||||||
logger.error({ err }, "Unexpected PostgreSQL pool error — connection will be removed from pool");
|
// after emitting this event, so we just log it.
|
||||||
|
pool.on("error", (err, client) => {
|
||||||
|
logger.error({ err }, "Unexpected error on idle PostgreSQL client — evicted from pool");
|
||||||
});
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -56,7 +58,10 @@ export function isTransientError(err: any): boolean {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute a query with automatic retry on transient errors.
|
* Execute a query with automatic retry on transient errors.
|
||||||
* Retries up to `maxRetries` times with exponential backoff.
|
*
|
||||||
|
* KEY FIX: On transient error, we destroy the bad connection (client.release(true))
|
||||||
|
* so the pool creates a fresh TCP connection on the next attempt, instead of
|
||||||
|
* reusing a dead socket from the pool.
|
||||||
*/
|
*/
|
||||||
export async function queryWithRetry(
|
export async function queryWithRetry(
|
||||||
queryText: string,
|
queryText: string,
|
||||||
|
|
@ -66,9 +71,18 @@ export async function queryWithRetry(
|
||||||
let lastError: any;
|
let lastError: any;
|
||||||
|
|
||||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||||
|
let client: pg.PoolClient | undefined;
|
||||||
try {
|
try {
|
||||||
return await pool.query(queryText, params);
|
client = await pool.connect();
|
||||||
|
const result = await client.query(queryText, params);
|
||||||
|
client.release(); // Return healthy connection to pool
|
||||||
|
return result;
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
|
// Destroy the bad connection so pool doesn't reuse it
|
||||||
|
if (client) {
|
||||||
|
try { client.release(true); } catch (_) { /* already destroyed */ }
|
||||||
|
}
|
||||||
|
|
||||||
lastError = err;
|
lastError = err;
|
||||||
|
|
||||||
if (!isTransientError(err) || attempt === maxRetries) {
|
if (!isTransientError(err) || attempt === maxRetries) {
|
||||||
|
|
@ -78,7 +92,7 @@ export async function queryWithRetry(
|
||||||
const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); // 1s, 2s, 4s (capped at 5s)
|
const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000); // 1s, 2s, 4s (capped at 5s)
|
||||||
logger.warn(
|
logger.warn(
|
||||||
{ err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs },
|
{ err: err.message, code: err.code, attempt: attempt + 1, maxRetries, delayMs },
|
||||||
"Transient DB error, retrying..."
|
"Transient DB error, destroying bad connection and retrying..."
|
||||||
);
|
);
|
||||||
await new Promise(resolve => setTimeout(resolve, delayMs));
|
await new Promise(resolve => setTimeout(resolve, delayMs));
|
||||||
}
|
}
|
||||||
|
|
@ -89,13 +103,33 @@ export async function queryWithRetry(
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect with retry — for operations that need a client (transactions).
|
* Connect with retry — for operations that need a client (transactions).
|
||||||
|
* On transient connect errors, waits and retries so the pool can establish
|
||||||
|
* fresh connections to the new PgBouncer pod.
|
||||||
*/
|
*/
|
||||||
export async function connectWithRetry(maxRetries = 3): Promise<pg.PoolClient> {
|
export async function connectWithRetry(maxRetries = 3): Promise<pg.PoolClient> {
|
||||||
let lastError: any;
|
let lastError: any;
|
||||||
|
|
||||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||||
try {
|
try {
|
||||||
return await pool.connect();
|
const client = await pool.connect();
|
||||||
|
// Validate the connection is actually alive
|
||||||
|
try {
|
||||||
|
await client.query("SELECT 1");
|
||||||
|
} catch (validationErr: any) {
|
||||||
|
// Connection is dead — destroy it and retry
|
||||||
|
try { client.release(true); } catch (_) {}
|
||||||
|
if (!isTransientError(validationErr) || attempt === maxRetries) {
|
||||||
|
throw validationErr;
|
||||||
|
}
|
||||||
|
const delayMs = Math.min(1000 * Math.pow(2, attempt), 5000);
|
||||||
|
logger.warn(
|
||||||
|
{ err: validationErr.message, code: validationErr.code, attempt: attempt + 1 },
|
||||||
|
"Connection validation failed, destroying and retrying..."
|
||||||
|
);
|
||||||
|
await new Promise(resolve => setTimeout(resolve, delayMs));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return client;
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
lastError = err;
|
lastError = err;
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue