diff --git a/packages/jobs/lib/crons/autoIdleDemo.ts b/packages/jobs/lib/crons/autoIdleDemo.ts index 2954d5652a1..1560bebb5d9 100644 --- a/packages/jobs/lib/crons/autoIdleDemo.ts +++ b/packages/jobs/lib/crons/autoIdleDemo.ts @@ -57,6 +57,7 @@ export async function exec({ orchestrator }: { orchestrator: Orchestrator }): Pr logger.info(`[autoidle] pausing ${sync.id}`); const res = await orchestrator.runSyncCommand({ + connectionId: sync.connection_unique_id, syncId: sync.id, command: SyncCommand.PAUSE, environmentId: sync.environment_id, diff --git a/packages/jobs/lib/crons/deleteSyncsData.ts b/packages/jobs/lib/crons/deleteSyncsData.ts index 5920beb868f..b412e3ed070 100644 --- a/packages/jobs/lib/crons/deleteSyncsData.ts +++ b/packages/jobs/lib/crons/deleteSyncsData.ts @@ -66,8 +66,12 @@ export async function exec(): Promise { // ---- // hard delete records - const res = await records.deleteRecordsBySyncId({ syncId: sync.id, limit: limitRecords }); - metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_RECORDS, res.totalDeletedRecords); + let deletedRecords = 0; + for (const model of sync.models) { + const res = await records.deleteRecordsBySyncId({ connectionId: sync.connectionId, model, syncId: sync.id, limit: limitRecords }); + deletedRecords += res.totalDeletedRecords; + } + metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_RECORDS, deletedRecords); } }); diff --git a/packages/records/lib/models/records.ts b/packages/records/lib/models/records.ts index 4d765b379a9..e9d8c1192ac 100644 --- a/packages/records/lib/models/records.ts +++ b/packages/records/lib/models/records.ts @@ -325,14 +325,27 @@ export async function update({ } } -export async function deleteRecordsBySyncId({ syncId, limit = 5000 }: { syncId: string; limit?: number }): Promise<{ totalDeletedRecords: number }> { +export async function deleteRecordsBySyncId({ + connectionId, + model, + syncId, + limit = 5000 +}: { + connectionId: number; + model: string; + syncId: string; + limit?: number; +}): Promise<{ totalDeletedRecords: number }> { let totalDeletedRecords = 0; let deletedRecords = 0; do { + // records table is partitioned by connection_id and model + // to avoid table scan, we must filter by connection_id and model deletedRecords = await db .from(RECORDS_TABLE) + .where({ connection_id: connectionId, model }) .whereIn('id', function (sub) { - void sub.select('id').from(RECORDS_TABLE).where({ sync_id: syncId }).limit(limit); + void sub.select('id').from(RECORDS_TABLE).where({ connection_id: connectionId, model, sync_id: syncId }).limit(limit); }) .del(); totalDeletedRecords += deletedRecords; diff --git a/packages/server/lib/controllers/sync.controller.ts b/packages/server/lib/controllers/sync.controller.ts index d099dbfc077..8dbc7b5e67a 100644 --- a/packages/server/lib/controllers/sync.controller.ts +++ b/packages/server/lib/controllers/sync.controller.ts @@ -571,6 +571,7 @@ class SyncController { } const result = await orchestrator.runSyncCommand({ + connectionId: connection.id!, syncId: sync_id, command, environmentId: environment.id, diff --git a/packages/shared/lib/clients/orchestrator.ts b/packages/shared/lib/clients/orchestrator.ts index 361802215cf..3146ff20ffe 100644 --- a/packages/shared/lib/clients/orchestrator.ts +++ b/packages/shared/lib/clients/orchestrator.ts @@ -27,12 +27,12 @@ import { SyncCommand } from '../models/index.js'; import tracer from 'dd-trace'; import { clearLastSyncDate } from '../services/sync/sync.service.js'; import { isSyncJobRunning } from '../services/sync/job.service.js'; -import { getSyncConfigRaw } from '../services/sync/config/config.service.js'; +import { getSyncConfigRaw, getSyncConfigBySyncId } from '../services/sync/config/config.service.js'; import environmentService from '../services/environment.service.js'; import type { DBEnvironment, DBTeam } from '@nangohq/types'; export interface RecordsServiceInterface { - deleteRecordsBySyncId({ syncId }: { syncId: string }): Promise<{ totalDeletedRecords: number }>; + deleteRecordsBySyncId({ connectionId, model, syncId }: { connectionId: number; model: string; syncId: string }): Promise<{ totalDeletedRecords: number }>; } export interface OrchestratorClientInterface { @@ -540,6 +540,7 @@ export class Orchestrator { } async runSyncCommand({ + connectionId, syncId, command, environmentId, @@ -547,6 +548,7 @@ export class Orchestrator { recordsService, initiator }: { + connectionId: number; syncId: string; command: SyncCommand; environmentId: number; @@ -584,8 +586,11 @@ export class Orchestrator { await cancelling(syncId); await clearLastSyncDate(syncId); - const del = await recordsService.deleteRecordsBySyncId({ syncId }); - await logCtx.info(`Records for the sync were deleted successfully`, del); + const syncConfig = await getSyncConfigBySyncId(syncId); + for (const model of syncConfig?.models || []) { + const del = await recordsService.deleteRecordsBySyncId({ syncId, connectionId, model }); + await logCtx.info(`Records for model ${model} were deleted successfully`, del); + } res = await this.client.executeSync({ scheduleName }); break; diff --git a/packages/shared/lib/services/sync/config/config.service.ts b/packages/shared/lib/services/sync/config/config.service.ts index 2fe1d071d75..e2deef798f9 100644 --- a/packages/shared/lib/services/sync/config/config.service.ts +++ b/packages/shared/lib/services/sync/config/config.service.ts @@ -558,6 +558,23 @@ export async function getSyncConfigByJobId(job_id: number): Promise { + const result = await schema() + .from(TABLE) + .select(`${TABLE}.*`) + .join('_nango_syncs', `${TABLE}.id`, '_nango_syncs.sync_config_id') + .where({ + '_nango_syncs.id': syncId + }) + .first(); + + if (!result) { + return null; + } + + return result; +} + export async function getAttributes(provider_config_key: string, sync_name: string): Promise { const result = await schema() .from(TABLE) diff --git a/packages/shared/lib/services/sync/manager.service.ts b/packages/shared/lib/services/sync/manager.service.ts index 5856cad55a3..f5ef0b10416 100644 --- a/packages/shared/lib/services/sync/manager.service.ts +++ b/packages/shared/lib/services/sync/manager.service.ts @@ -261,6 +261,7 @@ export class SyncManagerService { } await orchestrator.runSyncCommand({ + connectionId: connection.id!, syncId: sync.id, command, environmentId: environment.id, @@ -288,6 +289,7 @@ export class SyncManagerService { } await orchestrator.runSyncCommand({ + connectionId: connection.id!, syncId: sync.id, command, environmentId: environment.id, diff --git a/packages/shared/lib/services/sync/sync.service.ts b/packages/shared/lib/services/sync/sync.service.ts index 2e77b9a9406..706b98359a2 100644 --- a/packages/shared/lib/services/sync/sync.service.ts +++ b/packages/shared/lib/services/sync/sync.service.ts @@ -639,11 +639,14 @@ export async function findDemoSyncs(): Promise { return syncs; } -export async function findRecentlyDeletedSync(): Promise<{ id: string; environmentId: number }[]> { +export async function findRecentlyDeletedSync(): Promise<{ id: string; environmentId: number; connectionId: number; models: string[] }[]> { const q = db.knex .from('_nango_syncs') - .select<{ id: string; environmentId: number }[]>('_nango_syncs.id', '_nango_connections.environment_id') + .select< + { id: string; environmentId: number; connectionId: number; models: string[] }[] + >('_nango_syncs.id', '_nango_connections.environment_id', '_nango_connections.id', '_nango_sync_configs.models') .join('_nango_connections', '_nango_connections.id', '_nango_syncs.nango_connection_id') + .join('_nango_sync_configs', '_nango_sync_configs.id', '_nango_syncs.sync_config_id') .where(db.knex.raw("_nango_syncs.deleted_at > NOW() - INTERVAL '6h'")); return await q; }