diff --git a/packages/jobs/lib/execution/sync.ts b/packages/jobs/lib/execution/sync.ts index 8926c58e38..1b5513aebc 100644 --- a/packages/jobs/lib/execution/sync.ts +++ b/packages/jobs/lib/execution/sync.ts @@ -167,6 +167,7 @@ export async function startSync(task: TaskSync, startScriptFn = startScript): Pr await onFailure({ connection: task.connection, provider: providerConfig?.provider || 'unknown', + providerConfig: providerConfig, syncId: task.syncId, syncName: syncConfig?.sync_name || 'unknown', syncType: syncType, @@ -189,8 +190,11 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps const logCtx = await logContextGetter.get({ id: String(nangoProps.activityLogId) }); const runTime = (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000; const syncType: SyncTypeLiteral = nangoProps.syncConfig.sync_type?.toLocaleLowerCase() === 'full' ? 'full' : 'incremental'; + let team: DBTeam | undefined; let environment: DBEnvironment | undefined; + let providerConfig: Config | null = null; + try { const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: nangoProps.environmentId }); if (!accountAndEnv) { @@ -216,8 +220,8 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps provider_config_key: nangoProps.providerConfigKey }; - const providerConfig = await configService.getProviderConfig(connection.provider_config_key, connection.environment_id); - if (providerConfig === null) { + providerConfig = await configService.getProviderConfig(connection.provider_config_key, connection.environment_id); + if (!providerConfig) { throw new Error(`Provider config not found for connection: ${connection.connection_id}`); } @@ -252,6 +256,7 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps environment, connection, provider: nangoProps.provider, + providerConfig, syncId: nangoProps.syncId, syncName: nangoProps.syncConfig.sync_name, syncType, @@ -298,34 +303,36 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps model } }); + void tracer.scope().activate(span, async () => { - try { - const res = await sendSyncWebhook({ - account: team!, - connection: connection, - environment: environment!, - syncConfig: nangoProps.syncConfig, - providerConfig, - webhookSettings, - syncName: nangoProps.syncConfig.sync_name, - model, - now: nangoProps.startedAt, - success: true, - responseResults: { - added, - updated, - deleted - }, - operation: lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL - }); - - if (res.isErr()) { - throw new Error(`Failed to send webhook for sync: ${nangoProps.syncConfig.sync_name}`); + if (team && environment && providerConfig) { + try { + const res = await sendSyncWebhook({ + account: team, + connection: connection, + environment: environment, + syncConfig: nangoProps.syncConfig, + providerConfig, + webhookSettings, + model, + now: nangoProps.startedAt, + success: true, + responseResults: { + added, + updated, + deleted + }, + operation: lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL + }); + + if (res.isErr()) { + throw new Error(`Failed to send webhook for sync: ${nangoProps.syncConfig.sync_name}`); + } + } catch (err) { + span?.setTag('error', err); + } finally { + span.finish(); } - } catch (err) { - span?.setTag('error', err); - } finally { - span.finish(); } }); } @@ -419,6 +426,7 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps provider_config_key: nangoProps.providerConfigKey }, provider: nangoProps.provider, + providerConfig, syncId: nangoProps.syncId!, syncName: nangoProps.syncConfig.sync_name, syncType, @@ -440,11 +448,19 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps export async function handleSyncError({ nangoProps, error }: { nangoProps: NangoProps; error: NangoError }): Promise { let team: DBTeam | undefined; let environment: DBEnvironment | undefined; + let providerConfig: Config | null = null; + const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: nangoProps.environmentId }); if (accountAndEnv) { team = accountAndEnv.account; environment = accountAndEnv.environment; } + + providerConfig = await configService.getProviderConfig(nangoProps.providerConfigKey, nangoProps.environmentId); + if (!providerConfig) { + throw new Error(`Provider config not found for connection: ${nangoProps.nangoConnectionId}`); + } + await onFailure({ team, environment, @@ -455,6 +471,7 @@ export async function handleSyncError({ nangoProps, error }: { nangoProps: Nango provider_config_key: nangoProps.providerConfigKey }, provider: nangoProps.provider, + providerConfig, syncId: nangoProps.syncId!, syncName: nangoProps.syncConfig.sync_name, syncType: nangoProps.syncConfig.sync_type!, @@ -516,9 +533,10 @@ export async function abortSync(task: TaskSyncAbort): Promise> { provider_config_key: task.connection.provider_config_key }, provider: providerConfig.provider, + providerConfig, syncId: task.syncId, - syncName: syncConfig.sync_name, syncType: syncConfig.sync_type!, + syncName: syncConfig.sync_name, syncJobId: syncJob.id, activityLogId: syncJob.log_id!, debug: task.debug, @@ -554,6 +572,7 @@ async function onFailure({ environment, connection, provider, + providerConfig, syncId, syncName, syncType, @@ -573,6 +592,7 @@ async function onFailure({ environment?: DBEnvironment | undefined; connection: NangoConnection; provider: string; + providerConfig: Config | null; syncId: string; syncName: string; syncType: SyncTypeLiteral; @@ -641,35 +661,37 @@ async function onFailure({ syncSuccess: false } }); - void tracer.scope().activate(span, async () => { - try { - const res = await sendSyncWebhook({ - account: team!, - providerConfig, - syncConfig, - connection: connection, - environment: environment, - webhookSettings, - syncName: syncName, - model: models.join(','), - success: false, - error: { - type: 'script_error', - description: error.message - }, - now: lastSyncDate, - operation: lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL - }); - if (res.isErr()) { - throw new Error(`Failed to send webhook for sync: ${syncName}`); + if (team && environment && syncConfig && providerConfig) { + void tracer.scope().activate(span, async () => { + try { + const res = await sendSyncWebhook({ + account: team, + providerConfig, + syncConfig, + connection: connection, + environment: environment, + webhookSettings, + model: models.join(','), + success: false, + error: { + type: 'script_error', + description: error.message + }, + now: lastSyncDate, + operation: lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL + }); + + if (res.isErr()) { + throw new Error(`Failed to send webhook for sync: ${syncName}`); + } + } catch (err) { + span?.setTag('error', err); + } finally { + span.finish(); } - } catch (err) { - span?.setTag('error', err); - } finally { - span.finish(); - } - }); + }); + } } await updateSyncJobStatus(syncJobId, SyncStatus.STOPPED); diff --git a/packages/jobs/lib/execution/webhook.ts b/packages/jobs/lib/execution/webhook.ts index 0354b4cfb4..07fd4f7a4b 100644 --- a/packages/jobs/lib/execution/webhook.ts +++ b/packages/jobs/lib/execution/webhook.ts @@ -29,7 +29,7 @@ import { getRunnerFlags } from '../utils/flags.js'; export async function startWebhook(task: TaskWebhook): Promise> { let team: DBTeam | undefined; let environment: DBEnvironment | undefined; - let providerConfig: Config | undefined | null; + let providerConfig: Config | null = null; let sync: Sync | undefined | null; let syncJob: Pick | null = null; let syncConfig: DBSyncConfig | null = null; @@ -145,6 +145,7 @@ export async function startWebhook(task: TaskWebhook): Promise> { syncName: task.parentSyncName, syncJobId: syncJob?.id, providerConfigKey: task.connection.provider_config_key, + providerConfig, activityLogId: task.activityLogId, models: syncConfig?.models || [], runTime: 0, @@ -184,11 +185,22 @@ export async function handleWebhookSuccess({ nangoProps }: { nangoProps: NangoPr throw new Error(`Failed to update sync job status to SUCCESS for sync job: ${nangoProps.syncJobId}`); } + const providerConfig = await configService.getProviderConfig(nangoProps.providerConfigKey, nangoProps.environmentId); + if (providerConfig === null) { + throw new Error(`Provider config not found for connection: ${nangoProps.connectionId}`); + } + const webhookSettings = await externalWebhookService.get(nangoProps.environmentId); - const logCtx = await logContextGetter.get({ id: String(nangoProps.activityLogId) }); - const environment = await environmentService.getById(nangoProps.environmentId); + + const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: nangoProps.environmentId }); + if (!accountAndEnv) { + throw new Error(`Account and environment not found`); + } + const team = accountAndEnv.account; + const environment = accountAndEnv.environment; + if (environment) { - for (const model of nangoProps.syncConfig.models!) { + for (const model of nangoProps.syncConfig.models || []) { const span = tracer.startSpan('jobs.webhook.webhook', { tags: { environmentId: nangoProps.environmentId, @@ -199,9 +211,11 @@ export async function handleWebhookSuccess({ nangoProps }: { nangoProps: NangoPr model } }); + void tracer.scope().activate(span, async () => { try { const res = await sendSyncWebhook({ + account: team, connection: { id: nangoProps.nangoConnectionId!, connection_id: nangoProps.connectionId, @@ -210,13 +224,13 @@ export async function handleWebhookSuccess({ nangoProps }: { nangoProps: NangoPr }, environment: environment, webhookSettings, - syncName: nangoProps.syncConfig.sync_name, + syncConfig: nangoProps.syncConfig, + providerConfig, model, now: nangoProps.startedAt, success: true, responseResults: syncJob.result?.[model] || { added: 0, updated: 0, deleted: 0 }, - operation: 'WEBHOOK', - logCtx + operation: 'WEBHOOK' }); if (res.isErr()) { @@ -240,6 +254,12 @@ export async function handleWebhookError({ nangoProps, error }: { nangoProps: Na team = accountAndEnv.account; environment = accountAndEnv.environment; } + + const providerConfig = await configService.getProviderConfig(nangoProps.providerConfigKey, nangoProps.environmentId); + if (providerConfig === null) { + throw new Error(`Provider config not found for connection: ${nangoProps.connectionId}`); + } + await onFailure({ team, environment, @@ -253,8 +273,9 @@ export async function handleWebhookError({ nangoProps, error }: { nangoProps: Na syncName: nangoProps.syncConfig.sync_name, syncJobId: nangoProps.syncJobId!, providerConfigKey: nangoProps.providerConfigKey, + providerConfig, activityLogId: nangoProps.activityLogId || 'unknown', - models: nangoProps.syncConfig.models!, + models: nangoProps.syncConfig.models || [], runTime: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, error, syncConfig: nangoProps.syncConfig, @@ -270,9 +291,9 @@ async function onFailure({ syncName, syncJobId, syncConfig, + providerConfig, providerConfigKey, models, - activityLogId, runTime, error, endUser @@ -284,6 +305,7 @@ async function onFailure({ syncJobId?: number | undefined; syncName: string; syncConfig: DBSyncConfig | null; + providerConfig: Config | null; providerConfigKey: string; models: string[]; activityLogId: string; @@ -319,7 +341,6 @@ async function onFailure({ if (environment) { const webhookSettings = await externalWebhookService.get(environment.id); if (webhookSettings) { - const logCtx = await logContextGetter.get({ id: activityLogId }); const span = tracer.startSpan('jobs.webhook.webhook', { tags: { environmentId: environment.id, @@ -329,33 +350,37 @@ async function onFailure({ syncSuccess: false } }); - void tracer.scope().activate(span, async () => { - try { - const res = await sendSyncWebhook({ - connection: connection, - environment, - webhookSettings, - syncName: syncName, - model: models.join(','), - success: false, - error: { - type: 'script_error', - description: error.message - }, - now: new Date(), - operation: 'WEBHOOK', - logCtx: logCtx - }); - if (res.isErr()) { - throw new Error(`Failed to send webhook for webhook: ${syncName}`); + if (team && environment && syncConfig && providerConfig) { + void tracer.scope().activate(span, async () => { + try { + const res = await sendSyncWebhook({ + account: team, + environment, + connection: connection, + webhookSettings, + syncConfig, + providerConfig, + model: models.join(','), + success: false, + error: { + type: 'script_error', + description: error.message + }, + now: new Date(), + operation: 'WEBHOOK' + }); + + if (res.isErr()) { + throw new Error(`Failed to send webhook for webhook: ${syncName}`); + } + } catch (err) { + span?.setTag('error', err); + } finally { + span.finish(); } - } catch (err) { - span?.setTag('error', err); - } finally { - span.finish(); - } - }); + }); + } } } } diff --git a/packages/webhooks/lib/sync.ts b/packages/webhooks/lib/sync.ts index 1ff2c62a9c..d72d9c512a 100644 --- a/packages/webhooks/lib/sync.ts +++ b/packages/webhooks/lib/sync.ts @@ -7,8 +7,8 @@ import type { NangoSyncWebhookBodyBase, DBEnvironment, DBTeam, - DBSyncConfig, - NangoConnection + Connection, + DBSyncConfig } from '@nangohq/types'; import dayjs from 'dayjs'; import utc from 'dayjs/plugin/utc.js'; @@ -25,9 +25,8 @@ export const sendSync = async ({ environment, account, providerConfig, - syncConfig, webhookSettings, - syncName, + syncConfig, model, now, responseResults, @@ -35,13 +34,12 @@ export const sendSync = async ({ operation, error }: { - connection: NangoConnection; + connection: Connection | Pick; environment: DBEnvironment; account: DBTeam; providerConfig: Config; - syncConfig: DBSyncConfig; webhookSettings: ExternalWebhook | null; - syncName: string; + syncConfig: DBSyncConfig; model: string; now: Date | undefined; operation: SyncType; @@ -74,7 +72,7 @@ export const sendSync = async ({ type: 'sync', connectionId: connection.connection_id, providerConfigKey: connection.provider_config_key, - syncName, + syncName: syncConfig.sync_name, model, // For backward compatibility reason we are sending the syncType as INITIAL instead of FULL syncType: operation === 'FULL' ? 'INITIAL' : operation diff --git a/packages/webhooks/lib/sync.unit.test.ts b/packages/webhooks/lib/sync.unit.test.ts index b68fb762a5..a07b82deb5 100644 --- a/packages/webhooks/lib/sync.unit.test.ts +++ b/packages/webhooks/lib/sync.unit.test.ts @@ -7,7 +7,8 @@ import * as logPackage from '@nangohq/logs'; const spy = vi.spyOn(axiosInstance, 'post'); -const connection: Pick = { +const connection: Pick = { + id: 1, connection_id: '1', provider_config_key: 'providerkey' }; @@ -33,7 +34,6 @@ describe('Webhooks: sync notification tests', () => { }); it('Should not send a sync webhook if the webhook url is not present', async () => { - const logCtx = getLogCtx(); const responseResults = { added: 10, updated: 0, deleted: 0 }; await sendSync({ @@ -45,13 +45,11 @@ describe('Webhooks: sync notification tests', () => { secondary_url: '', on_sync_completion_always: false }, - syncName: 'syncName', model: 'model', responseResults, success: true, operation: 'INCREMENTAL', - now: new Date(), - logCtx + now: new Date() }); expect(spy).not.toHaveBeenCalled(); });