From 6edc351e619c53aee44e11d0ecab8960b7793b56 Mon Sep 17 00:00:00 2001 From: Samuel Bodin <1637651+bodinsamuel@users.noreply.github.com> Date: Tue, 17 Sep 2024 14:11:57 +0200 Subject: [PATCH] fix(db): identify pg lock (#2739) ## Describe your changes - Add hint to identify slow lock --- packages/jobs/lib/crons/deleteSyncsData.ts | 6 ++++-- .../lib/workers/scheduling/scheduling.worker.ts | 4 ++-- packages/server/lib/refreshTokens.ts | 4 ++-- .../lib/services/notification/slack.service.ts | 14 ++++++++++---- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/packages/jobs/lib/crons/deleteSyncsData.ts b/packages/jobs/lib/crons/deleteSyncsData.ts index f84078d8517..5920beb868f 100644 --- a/packages/jobs/lib/crons/deleteSyncsData.ts +++ b/packages/jobs/lib/crons/deleteSyncsData.ts @@ -34,12 +34,14 @@ export async function exec(): Promise { await db.knex.transaction(async (trx) => { // Because it's slow and create deadlocks // we need to acquire a Lock that prevents any other duplicate cron to execute the same thing - const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [123456789]); - if (!rows || rows.length <= 0 || !rows[0]!.pg_try_advisory_xact_lock) { + const { rows } = await trx.raw<{ rows: { delete_syncs: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?) as delete_syncs`, [123456789]); + if (!rows || rows.length <= 0 || !rows[0]!.delete_syncs) { logger.info(`[deleteSyncs] could not acquire lock, skipping`); return; } + // NB: we are not using trx again, we only care about the lock + const syncs = await findRecentlyDeletedSync(); const orchestrator = new Orchestrator(orchestratorClient); diff --git a/packages/scheduler/lib/workers/scheduling/scheduling.worker.ts b/packages/scheduler/lib/workers/scheduling/scheduling.worker.ts index 8af9e8dbbce..ac5f0da9d2e 100644 --- a/packages/scheduler/lib/workers/scheduling/scheduling.worker.ts +++ b/packages/scheduler/lib/workers/scheduling/scheduling.worker.ts @@ -103,9 +103,9 @@ export class SchedulingChild { // Try to acquire a lock to prevent multiple instances from scheduling at the same time const lockSpan = tracer.startSpan('scheduler.scheduling.acquire_lock', { childOf: span }); const res = await tracer.scope().activate(lockSpan, async () => { - return trx.raw('SELECT pg_try_advisory_xact_lock(?) AS lock_granted', [5003001106]); + return trx.raw<{ rows: { lock_schedule: boolean }[] }>('SELECT pg_try_advisory_xact_lock(?) AS lock_schedule', [5003001106]); }); - const lockGranted = res?.rows.length > 0 ? res.rows[0].lock_granted : false; + const lockGranted = res?.rows.length > 0 ? res.rows[0]!.lock_schedule : false; lockSpan.finish(); if (lockGranted) { diff --git a/packages/server/lib/refreshTokens.ts b/packages/server/lib/refreshTokens.ts index 6fd51c3e3ac..16436984d2e 100644 --- a/packages/server/lib/refreshTokens.ts +++ b/packages/server/lib/refreshTokens.ts @@ -37,8 +37,8 @@ export async function exec(): Promise { // Lock to prevent multiple instances of this cron job from running at the same time await db.knex.transaction(async (trx) => { - const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [lockKey]); - if (!rows?.[0]?.pg_try_advisory_xact_lock) { + const { rows } = await trx.raw<{ rows: { lock_refresh_tokens: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?) as lock_refresh_tokens`, [lockKey]); + if (!rows?.[0]?.lock_refresh_tokens) { logger.info(`${cronName} could not acquire lock, skipping`); return; } diff --git a/packages/shared/lib/services/notification/slack.service.ts b/packages/shared/lib/services/notification/slack.service.ts index 64adf53517e..c2bbb86e464 100644 --- a/packages/shared/lib/services/notification/slack.service.ts +++ b/packages/shared/lib/services/notification/slack.service.ts @@ -501,9 +501,12 @@ export class SlackService { return await db.knex.transaction(async (trx) => { const lockKey = stringToHash(`${nangoConnection.environment_id}-${name}-${type}-add`); - const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [lockKey]); + const { rows } = await trx.raw<{ rows: { lock_slack_add_connection: boolean }[] }>( + `SELECT pg_try_advisory_xact_lock(?) as lock_slack_add_connection`, + [lockKey] + ); - if (!rows?.[0]?.pg_try_advisory_xact_lock) { + if (!rows?.[0]?.lock_slack_add_connection) { logger.info(`addFailingConnection operation: ${lockKey} could not acquire lock, skipping`); return { success: true, error: null, response: null }; } @@ -601,9 +604,12 @@ export class SlackService { const lockKey = stringToHash(`${nangoConnection.environment_id}-${name}-${type}-remove`); - const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [lockKey]); + const { rows } = await trx.raw<{ rows: { lock_slack_remove_connection: boolean }[] }>( + `SELECT pg_try_advisory_xact_lock(?) as lock_slack_remove_connection`, + [lockKey] + ); - if (!rows?.[0]?.pg_try_advisory_xact_lock) { + if (!rows?.[0]?.lock_slack_remove_connection) { logger.info(`removeFailingConnection operation: ${lockKey} could not acquire lock, skipping`); return; }