Skip to content

Commit

Permalink
fix: slow records deletion query
Browse files Browse the repository at this point in the history
Deleting records by syncId is unnecessarly slow because it doesn't use
the partition key (connectionId/model)
  • Loading branch information
TBonnin committed Sep 17, 2024
1 parent e40a1d8 commit 5ceea70
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 10 deletions.
1 change: 1 addition & 0 deletions packages/jobs/lib/crons/autoIdleDemo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export async function exec({ orchestrator }: { orchestrator: Orchestrator }): Pr
logger.info(`[autoidle] pausing ${sync.id}`);

const res = await orchestrator.runSyncCommand({
connectionId: sync.connection_unique_id,
syncId: sync.id,
command: SyncCommand.PAUSE,
environmentId: sync.environment_id,
Expand Down
8 changes: 6 additions & 2 deletions packages/jobs/lib/crons/deleteSyncsData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ export async function exec(): Promise<void> {

// ----
// hard delete records
const res = await records.deleteRecordsBySyncId({ syncId: sync.id, limit: limitRecords });
metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_RECORDS, res.totalDeletedRecords);
let deletedRecords = 0;
for (const model of sync.models) {
const res = await records.deleteRecordsBySyncId({ connectionId: sync.connectionId, model, syncId: sync.id, limit: limitRecords });
deletedRecords += res.totalDeletedRecords;
}
metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_RECORDS, deletedRecords);
}
});

Expand Down
17 changes: 15 additions & 2 deletions packages/records/lib/models/records.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,27 @@ export async function update({
}
}

export async function deleteRecordsBySyncId({ syncId, limit = 5000 }: { syncId: string; limit?: number }): Promise<{ totalDeletedRecords: number }> {
export async function deleteRecordsBySyncId({
connectionId,
model,
syncId,
limit = 5000
}: {
connectionId: number;
model: string;
syncId: string;
limit?: number;
}): Promise<{ totalDeletedRecords: number }> {
let totalDeletedRecords = 0;
let deletedRecords = 0;
do {
// records table is partitioned by connection_id and model
// to avoid table scan, we must filter by connection_id and model
deletedRecords = await db
.from(RECORDS_TABLE)
.where({ connection_id: connectionId, model })
.whereIn('id', function (sub) {
void sub.select('id').from(RECORDS_TABLE).where({ sync_id: syncId }).limit(limit);
void sub.select('id').from(RECORDS_TABLE).where({ connection_id: connectionId, model, sync_id: syncId }).limit(limit);
})
.del();
totalDeletedRecords += deletedRecords;
Expand Down
1 change: 1 addition & 0 deletions packages/server/lib/controllers/sync.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ class SyncController {
}

const result = await orchestrator.runSyncCommand({
connectionId: connection.id!,
syncId: sync_id,
command,
environmentId: environment.id,
Expand Down
13 changes: 9 additions & 4 deletions packages/shared/lib/clients/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import { SyncCommand } from '../models/index.js';
import tracer from 'dd-trace';
import { clearLastSyncDate } from '../services/sync/sync.service.js';
import { isSyncJobRunning } from '../services/sync/job.service.js';
import { getSyncConfigRaw } from '../services/sync/config/config.service.js';
import { getSyncConfigRaw, getSyncConfigBySyncId } from '../services/sync/config/config.service.js';
import environmentService from '../services/environment.service.js';
import type { DBEnvironment, DBTeam } from '@nangohq/types';

export interface RecordsServiceInterface {
deleteRecordsBySyncId({ syncId }: { syncId: string }): Promise<{ totalDeletedRecords: number }>;
deleteRecordsBySyncId({ connectionId, model, syncId }: { connectionId: number; model: string; syncId: string }): Promise<{ totalDeletedRecords: number }>;
}

export interface OrchestratorClientInterface {
Expand Down Expand Up @@ -540,13 +540,15 @@ export class Orchestrator {
}

async runSyncCommand({
connectionId,
syncId,
command,
environmentId,
logCtx,
recordsService,
initiator
}: {
connectionId: number;
syncId: string;
command: SyncCommand;
environmentId: number;
Expand Down Expand Up @@ -584,8 +586,11 @@ export class Orchestrator {
await cancelling(syncId);

await clearLastSyncDate(syncId);
const del = await recordsService.deleteRecordsBySyncId({ syncId });
await logCtx.info(`Records for the sync were deleted successfully`, del);
const syncConfig = await getSyncConfigBySyncId(syncId);
for (const model of syncConfig?.models || []) {
const del = await recordsService.deleteRecordsBySyncId({ syncId, connectionId, model });
await logCtx.info(`Records for model ${model} were deleted successfully`, del);
}

res = await this.client.executeSync({ scheduleName });
break;
Expand Down
17 changes: 17 additions & 0 deletions packages/shared/lib/services/sync/config/config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,23 @@ export async function getSyncConfigByJobId(job_id: number): Promise<SyncConfig |
return result;
}

export async function getSyncConfigBySyncId(syncId: string): Promise<SyncConfig | null> {
const result = await schema()
.from<SyncConfig>(TABLE)
.select(`${TABLE}.*`)
.join('_nango_syncs', `${TABLE}.id`, '_nango_syncs.sync_config_id')
.where({
'_nango_syncs.id': syncId
})
.first();

if (!result) {
return null;
}

return result;
}

export async function getAttributes(provider_config_key: string, sync_name: string): Promise<object | null> {
const result = await schema()
.from<SyncConfig>(TABLE)
Expand Down
2 changes: 2 additions & 0 deletions packages/shared/lib/services/sync/manager.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ export class SyncManagerService {
}

await orchestrator.runSyncCommand({
connectionId: connection.id!,
syncId: sync.id,
command,
environmentId: environment.id,
Expand Down Expand Up @@ -288,6 +289,7 @@ export class SyncManagerService {
}

await orchestrator.runSyncCommand({
connectionId: connection.id!,
syncId: sync.id,
command,
environmentId: environment.id,
Expand Down
7 changes: 5 additions & 2 deletions packages/shared/lib/services/sync/sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -639,11 +639,14 @@ export async function findDemoSyncs(): Promise<PausableSyncs[]> {
return syncs;
}

export async function findRecentlyDeletedSync(): Promise<{ id: string; environmentId: number }[]> {
export async function findRecentlyDeletedSync(): Promise<{ id: string; environmentId: number; connectionId: number; models: string[] }[]> {
const q = db.knex
.from('_nango_syncs')
.select<{ id: string; environmentId: number }[]>('_nango_syncs.id', '_nango_connections.environment_id')
.select<
{ id: string; environmentId: number; connectionId: number; models: string[] }[]
>('_nango_syncs.id', '_nango_connections.environment_id', '_nango_connections.id', '_nango_sync_configs.models')
.join('_nango_connections', '_nango_connections.id', '_nango_syncs.nango_connection_id')
.join('_nango_sync_configs', '_nango_sync_configs.id', '_nango_syncs.sync_config_id')
.where(db.knex.raw("_nango_syncs.deleted_at > NOW() - INTERVAL '6h'"));
return await q;
}
Expand Down

0 comments on commit 5ceea70

Please sign in to comment.