Skip to content

Commit

Permalink
chore(fleet): add tracing around supervisor plan/execute
Browse files Browse the repository at this point in the history
also adding a try/catch around getRunner logic
  • Loading branch information
TBonnin committed Dec 12, 2024
1 parent 575fc05 commit fe09089
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 31 deletions.
79 changes: 60 additions & 19 deletions packages/fleet/lib/supervisor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import tracer from 'dd-trace';
import type { DatabaseClient } from './db/client.js';
import type { Knex } from 'knex';
import { logger } from './utils/logger.js';
Expand All @@ -24,6 +25,39 @@ type Operation =
| { type: 'TERMINATE'; node: Node }
| { type: 'REMOVE'; node: Node };

const Operation = {
asSpanTags: (o: Operation): Record<string, string | number> => {
switch (o.type) {
case 'CREATE':
return {
operation: o.type,
routingId: o.routingId,
deploymentId: o.deployment.id
};
case 'FAIL':
return {
operation: o.type,
nodeId: o.node.id,
reason: o.reason
};
case 'START':
return {
operation: o.type,
nodeId: o.node.id
};
case 'OUTDATE':
case 'FINISHING':
case 'FINISHING_TIMEOUT':
case 'TERMINATE':
case 'REMOVE':
return {
operation: o.type,
nodeId: o.node.id
};
}
}
};

type SupervisorState = 'stopped' | 'running' | 'stopping';

export const STATE_TIMEOUT_MS = {
Expand Down Expand Up @@ -82,7 +116,7 @@ export class Supervisor {
}

public async tick(): Promise<Result<void>> {
// TODO: trace
const span = tracer.startSpan('fleet.supervisor.tick');
try {
this.tickCancelled = false;
const plan = await this.plan();
Expand All @@ -93,7 +127,10 @@ export class Supervisor {
return Err(plan.error);
}
} catch (err) {
span.setTag('error', err);
return Err(new FleetError('supervisor_tick_failed', { cause: err }));
} finally {
span.finish();
}
}

Expand Down Expand Up @@ -270,39 +307,43 @@ export class Supervisor {
}

private async executePlan(plan: Operation[]): Promise<void> {
if (plan.length > 0) {
logger.info('Executing plan:', plan);
}
for (const action of plan) {
for (const operation of plan) {
if (this.tickCancelled) {
return;
}
const result = await this.execute(action);
if (result.isErr()) {
// TODO: trace
logger.error('Failed to execute action:', result.error, result.error.cause);

const span = tracer.startSpan(`fleet.supervisor.operation.${operation.type.toLowerCase()}`);
span.addTags(Operation.asSpanTags(operation));
try {
const result = await this.execute(operation);
if (result.isErr()) {
span.setTag('error', result.error);
logger.error('Failed to execute operation:', result.error, result.error.cause);
}
} finally {
span.finish();
}
}
}

private async execute(action: Operation): Promise<Result<Node>> {
switch (action.type) {
private async execute(operation: Operation): Promise<Result<Node>> {
switch (operation.type) {
case 'CREATE':
return this.createNode(this.dbClient.db, action);
return this.createNode(this.dbClient.db, operation);
case 'START':
return this.startNode(action);
return this.startNode(operation);
case 'OUTDATE':
return this.outdateNode(action);
return this.outdateNode(operation);
case 'FINISHING':
return this.finishingNode(action);
return this.finishingNode(operation);
case 'TERMINATE':
return this.terminateNode(action);
return this.terminateNode(operation);
case 'REMOVE':
return this.removeNode(action);
return this.removeNode(operation);
case 'FINISHING_TIMEOUT':
return this.finishingTimeout(action);
return this.finishingTimeout(operation);
case 'FAIL':
return this.failNode(action);
return this.failNode(operation);
}
}

Expand Down
28 changes: 16 additions & 12 deletions packages/jobs/lib/runner/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,24 @@ function getRunnerId(suffix: string): string {
}

export async function getRunner(teamId: number): Promise<Result<Runner>> {
// a runner per account in prod only
const runnerId = isProd ? getRunnerId(`${teamId}`) : getRunnerId('default');

const isFleetGloballyEnabled = await featureFlags.isEnabled('fleet', 'global', false);
const isFleetEnabledForTeam = await featureFlags.isEnabled('fleet', `${teamId}`, false);
const isFleetEnabled = isFleetGloballyEnabled || isFleetEnabledForTeam;
if (isFleetEnabled) {
const runner = await getOrStartRunner(runnerId).catch(() => getOrStartRunner(getRunnerId('default')));
try {
// a runner per account in prod only
const runnerId = isProd ? getRunnerId(`${teamId}`) : getRunnerId('default');

const isFleetGloballyEnabled = await featureFlags.isEnabled('fleet', 'global', false);
const isFleetEnabledForTeam = await featureFlags.isEnabled('fleet', `${teamId}`, false);
const isFleetEnabled = isFleetGloballyEnabled || isFleetEnabledForTeam;
if (isFleetEnabled) {
const runner = await getOrStartRunner(runnerId).catch(() => getOrStartRunner(getRunnerId('default')));
return Ok(runner);
}

// fallback to default runner if account runner isn't ready yet
const runner = await getOrStartRunnerLegacy(runnerId).catch(() => getOrStartRunnerLegacy(getRunnerId('default')));
return Ok(runner);
} catch (err) {
return Err(new Error(`Failed to get runner for team ${teamId}`, { cause: err }));
}

// fallback to default runner if account runner isn't ready yet
const runner = await getOrStartRunnerLegacy(runnerId).catch(() => getOrStartRunnerLegacy(getRunnerId('default')));
return Ok(runner);
}

export async function idle(nodeId: number): Promise<Result<void>> {
Expand Down

0 comments on commit fe09089

Please sign in to comment.