diff --git a/lib/dispatcher.ts b/lib/dispatcher.ts index 036398f..421cc99 100644 --- a/lib/dispatcher.ts +++ b/lib/dispatcher.ts @@ -32,6 +32,7 @@ interface PDConnection { primary: boolean; healthy: boolean; latency: number; + consecutiveFailures: number; } interface PDCheckupResult { @@ -69,6 +70,7 @@ export class PGDispatcher { primary: false, healthy: false, latency: Infinity, + consecutiveFailures: 0, }; }); this.options = options; @@ -186,7 +188,7 @@ export class PGDispatcher { const checkups = this.connections.map((connection) => this.checkDatabase(connection).catch((error) => { this.logger.error({message: 'Unhandled error during database checkup', error}); - connection.healthy = false; + this.setConnectionHealth(connection, false); connection.primary = false; }), ); @@ -241,7 +243,7 @@ export class PGDispatcher { connection.latency = new Date().getTime() - startTime; if (checkupResult) { - connection.healthy = checkupResult.pingOk; + this.setConnectionHealth(connection, checkupResult.pingOk); connection.primary = checkupResult.primary; } else { if (connection.latency > this.options.healthcheckTimeout) { @@ -256,11 +258,11 @@ export class PGDispatcher { }); } - connection.healthy = false; + this.setConnectionHealth(connection, false); connection.primary = false; } } catch (error) { - connection.healthy = false; + this.setConnectionHealth(connection, false); connection.primary = false; this.logger.error({ message: 'Database checkup failed', @@ -274,6 +276,22 @@ export class PGDispatcher { await Promise.all(this.connections.map((c) => c.knex)); } + private setConnectionHealth(connection: PDConnection, healthy: boolean) { + if (healthy) { + connection.consecutiveFailures = 0; + connection.healthy = true; + + return; + } + + connection.consecutiveFailures++; + + const threshold = this.options.healthcheckConsecutiveFailures; + if (!threshold || connection.consecutiveFailures >= threshold) { + connection.healthy = false; + } + } + private get healthyConnections() { return this.connections.filter((c) => c.healthy); } diff --git a/lib/types.ts b/lib/types.ts index 7352d1d..23a3179 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -4,6 +4,7 @@ import type {PGDispatcher} from './dispatcher'; export interface PDOptions { healthcheckInterval: number; healthcheckTimeout: number; + healthcheckConsecutiveFailures?: number; suppressStatusLogs: boolean; beforeTerminate: () => Promise; }