Skip to content

Commit

Permalink
fix(db): identify pg lock (#2739)
Browse files Browse the repository at this point in the history
## Describe your changes

- Add hint to identify slow lock
  • Loading branch information
bodinsamuel authored Sep 17, 2024
1 parent 7bd22eb commit 6edc351
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
6 changes: 4 additions & 2 deletions packages/jobs/lib/crons/deleteSyncsData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ export async function exec(): Promise<void> {
await db.knex.transaction(async (trx) => {
// Because it's slow and create deadlocks
// we need to acquire a Lock that prevents any other duplicate cron to execute the same thing
const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [123456789]);
if (!rows || rows.length <= 0 || !rows[0]!.pg_try_advisory_xact_lock) {
const { rows } = await trx.raw<{ rows: { delete_syncs: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?) as delete_syncs`, [123456789]);
if (!rows || rows.length <= 0 || !rows[0]!.delete_syncs) {
logger.info(`[deleteSyncs] could not acquire lock, skipping`);
return;
}

// NB: we are not using trx again, we only care about the lock

const syncs = await findRecentlyDeletedSync();

const orchestrator = new Orchestrator(orchestratorClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ export class SchedulingChild {
// Try to acquire a lock to prevent multiple instances from scheduling at the same time
const lockSpan = tracer.startSpan('scheduler.scheduling.acquire_lock', { childOf: span });
const res = await tracer.scope().activate(lockSpan, async () => {
return trx.raw('SELECT pg_try_advisory_xact_lock(?) AS lock_granted', [5003001106]);
return trx.raw<{ rows: { lock_schedule: boolean }[] }>('SELECT pg_try_advisory_xact_lock(?) AS lock_schedule', [5003001106]);
});
const lockGranted = res?.rows.length > 0 ? res.rows[0].lock_granted : false;
const lockGranted = res?.rows.length > 0 ? res.rows[0]!.lock_schedule : false;
lockSpan.finish();

if (lockGranted) {
Expand Down
4 changes: 2 additions & 2 deletions packages/server/lib/refreshTokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ export async function exec(): Promise<void> {

// Lock to prevent multiple instances of this cron job from running at the same time
await db.knex.transaction(async (trx) => {
const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [lockKey]);
if (!rows?.[0]?.pg_try_advisory_xact_lock) {
const { rows } = await trx.raw<{ rows: { lock_refresh_tokens: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?) as lock_refresh_tokens`, [lockKey]);
if (!rows?.[0]?.lock_refresh_tokens) {
logger.info(`${cronName} could not acquire lock, skipping`);
return;
}
Expand Down
14 changes: 10 additions & 4 deletions packages/shared/lib/services/notification/slack.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,12 @@ export class SlackService {
return await db.knex.transaction(async (trx) => {
const lockKey = stringToHash(`${nangoConnection.environment_id}-${name}-${type}-add`);

const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [lockKey]);
const { rows } = await trx.raw<{ rows: { lock_slack_add_connection: boolean }[] }>(
`SELECT pg_try_advisory_xact_lock(?) as lock_slack_add_connection`,
[lockKey]
);

if (!rows?.[0]?.pg_try_advisory_xact_lock) {
if (!rows?.[0]?.lock_slack_add_connection) {
logger.info(`addFailingConnection operation: ${lockKey} could not acquire lock, skipping`);
return { success: true, error: null, response: null };
}
Expand Down Expand Up @@ -601,9 +604,12 @@ export class SlackService {

const lockKey = stringToHash(`${nangoConnection.environment_id}-${name}-${type}-remove`);

const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [lockKey]);
const { rows } = await trx.raw<{ rows: { lock_slack_remove_connection: boolean }[] }>(
`SELECT pg_try_advisory_xact_lock(?) as lock_slack_remove_connection`,
[lockKey]
);

if (!rows?.[0]?.pg_try_advisory_xact_lock) {
if (!rows?.[0]?.lock_slack_remove_connection) {
logger.info(`removeFailingConnection operation: ${lockKey} could not acquire lock, skipping`);
return;
}
Expand Down

0 comments on commit 6edc351

Please sign in to comment.