Skip to content

Commit

Permalink
WIP: Getting sendSync to have its own log context
Browse files Browse the repository at this point in the history
  • Loading branch information
nalanj committed Jan 17, 2025
1 parent 6754c7c commit 79d4a8e
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 104 deletions.
134 changes: 78 additions & 56 deletions packages/jobs/lib/execution/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ export async function startSync(task: TaskSync, startScriptFn = startScript): Pr
await onFailure({
connection: task.connection,
provider: providerConfig?.provider || 'unknown',
providerConfig: providerConfig,
syncId: task.syncId,
syncName: syncConfig?.sync_name || 'unknown',
syncType: syncType,
Expand All @@ -189,8 +190,11 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps
const logCtx = await logContextGetter.get({ id: String(nangoProps.activityLogId) });
const runTime = (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000;
const syncType: SyncTypeLiteral = nangoProps.syncConfig.sync_type?.toLocaleLowerCase() === 'full' ? 'full' : 'incremental';

let team: DBTeam | undefined;
let environment: DBEnvironment | undefined;
let providerConfig: Config | null = null;

try {
const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: nangoProps.environmentId });
if (!accountAndEnv) {
Expand All @@ -216,8 +220,8 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps
provider_config_key: nangoProps.providerConfigKey
};

const providerConfig = await configService.getProviderConfig(connection.provider_config_key, connection.environment_id);
if (providerConfig === null) {
providerConfig = await configService.getProviderConfig(connection.provider_config_key, connection.environment_id);
if (!providerConfig) {
throw new Error(`Provider config not found for connection: ${connection.connection_id}`);
}

Expand Down Expand Up @@ -252,6 +256,7 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps
environment,
connection,
provider: nangoProps.provider,
providerConfig,
syncId: nangoProps.syncId,
syncName: nangoProps.syncConfig.sync_name,
syncType,
Expand Down Expand Up @@ -298,34 +303,36 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps
model
}
});

void tracer.scope().activate(span, async () => {
try {
const res = await sendSyncWebhook({
account: team!,
connection: connection,
environment: environment!,
syncConfig: nangoProps.syncConfig,
providerConfig,
webhookSettings,
syncName: nangoProps.syncConfig.sync_name,
model,
now: nangoProps.startedAt,
success: true,
responseResults: {
added,
updated,
deleted
},
operation: lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL
});

if (res.isErr()) {
throw new Error(`Failed to send webhook for sync: ${nangoProps.syncConfig.sync_name}`);
if (team && environment && providerConfig) {
try {
const res = await sendSyncWebhook({
account: team,
connection: connection,
environment: environment,
syncConfig: nangoProps.syncConfig,
providerConfig,
webhookSettings,
model,
now: nangoProps.startedAt,
success: true,
responseResults: {
added,
updated,
deleted
},
operation: lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL
});

if (res.isErr()) {
throw new Error(`Failed to send webhook for sync: ${nangoProps.syncConfig.sync_name}`);
}
} catch (err) {
span?.setTag('error', err);
} finally {
span.finish();
}
} catch (err) {
span?.setTag('error', err);
} finally {
span.finish();
}
});
}
Expand Down Expand Up @@ -419,6 +426,7 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps
provider_config_key: nangoProps.providerConfigKey
},
provider: nangoProps.provider,
providerConfig,
syncId: nangoProps.syncId!,
syncName: nangoProps.syncConfig.sync_name,
syncType,
Expand All @@ -440,11 +448,19 @@ export async function handleSyncSuccess({ nangoProps }: { nangoProps: NangoProps
export async function handleSyncError({ nangoProps, error }: { nangoProps: NangoProps; error: NangoError }): Promise<void> {
let team: DBTeam | undefined;
let environment: DBEnvironment | undefined;
let providerConfig: Config | null = null;

const accountAndEnv = await environmentService.getAccountAndEnvironment({ environmentId: nangoProps.environmentId });
if (accountAndEnv) {
team = accountAndEnv.account;
environment = accountAndEnv.environment;
}

providerConfig = await configService.getProviderConfig(nangoProps.providerConfigKey, nangoProps.environmentId);
if (!providerConfig) {
throw new Error(`Provider config not found for connection: ${nangoProps.nangoConnectionId}`);
}

await onFailure({
team,
environment,
Expand All @@ -455,6 +471,7 @@ export async function handleSyncError({ nangoProps, error }: { nangoProps: Nango
provider_config_key: nangoProps.providerConfigKey
},
provider: nangoProps.provider,
providerConfig,
syncId: nangoProps.syncId!,
syncName: nangoProps.syncConfig.sync_name,
syncType: nangoProps.syncConfig.sync_type!,
Expand Down Expand Up @@ -516,9 +533,10 @@ export async function abortSync(task: TaskSyncAbort): Promise<Result<void>> {
provider_config_key: task.connection.provider_config_key
},
provider: providerConfig.provider,
providerConfig,
syncId: task.syncId,
syncName: syncConfig.sync_name,
syncType: syncConfig.sync_type!,
syncName: syncConfig.sync_name,
syncJobId: syncJob.id,
activityLogId: syncJob.log_id!,
debug: task.debug,
Expand Down Expand Up @@ -554,6 +572,7 @@ async function onFailure({
environment,
connection,
provider,
providerConfig,
syncId,
syncName,
syncType,
Expand All @@ -573,6 +592,7 @@ async function onFailure({
environment?: DBEnvironment | undefined;
connection: NangoConnection;
provider: string;
providerConfig: Config | null;
syncId: string;
syncName: string;
syncType: SyncTypeLiteral;
Expand Down Expand Up @@ -641,35 +661,37 @@ async function onFailure({
syncSuccess: false
}
});
void tracer.scope().activate(span, async () => {
try {
const res = await sendSyncWebhook({
account: team!,
providerConfig,
syncConfig,
connection: connection,
environment: environment,
webhookSettings,
syncName: syncName,
model: models.join(','),
success: false,
error: {
type: 'script_error',
description: error.message
},
now: lastSyncDate,
operation: lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL
});

if (res.isErr()) {
throw new Error(`Failed to send webhook for sync: ${syncName}`);
if (team && environment && syncConfig && providerConfig) {
void tracer.scope().activate(span, async () => {
try {
const res = await sendSyncWebhook({
account: team,
providerConfig,
syncConfig,
connection: connection,
environment: environment,
webhookSettings,
model: models.join(','),
success: false,
error: {
type: 'script_error',
description: error.message
},
now: lastSyncDate,
operation: lastSyncDate ? SyncType.INCREMENTAL : SyncType.FULL
});

if (res.isErr()) {
throw new Error(`Failed to send webhook for sync: ${syncName}`);
}
} catch (err) {
span?.setTag('error', err);
} finally {
span.finish();
}
} catch (err) {
span?.setTag('error', err);
} finally {
span.finish();
}
});
});
}
}

await updateSyncJobStatus(syncJobId, SyncStatus.STOPPED);
Expand Down
Loading

0 comments on commit 79d4a8e

Please sign in to comment.