diff --git a/packages/fleet/lib/supervisor.ts b/packages/fleet/lib/supervisor.ts index ead2044ffb9..6a76ae7fc34 100644 --- a/packages/fleet/lib/supervisor.ts +++ b/packages/fleet/lib/supervisor.ts @@ -1,3 +1,4 @@ +import tracer from 'dd-trace'; import type { DatabaseClient } from './db/client.js'; import type { Knex } from 'knex'; import { logger } from './utils/logger.js'; @@ -24,6 +25,39 @@ type Operation = | { type: 'TERMINATE'; node: Node } | { type: 'REMOVE'; node: Node }; +const Operation = { + asSpanTags: (o: Operation): Record => { + switch (o.type) { + case 'CREATE': + return { + operation: o.type, + routingId: o.routingId, + deploymentId: o.deployment.id + }; + case 'FAIL': + return { + operation: o.type, + nodeId: o.node.id, + reason: o.reason + }; + case 'START': + return { + operation: o.type, + nodeId: o.node.id + }; + case 'OUTDATE': + case 'FINISHING': + case 'FINISHING_TIMEOUT': + case 'TERMINATE': + case 'REMOVE': + return { + operation: o.type, + nodeId: o.node.id + }; + } + } +}; + type SupervisorState = 'stopped' | 'running' | 'stopping'; export const STATE_TIMEOUT_MS = { @@ -82,19 +116,25 @@ export class Supervisor { } public async tick(): Promise> { - // TODO: trace - try { - this.tickCancelled = false; - const plan = await this.plan(); - if (plan.isOk()) { - await this.executePlan(plan.value); - return Ok(undefined); - } else { - return Err(plan.error); + const span = tracer.startSpan('fleet.supervisor.tick'); + return tracer.scope().activate(span, async () => { + try { + this.tickCancelled = false; + + const plan = await this.plan(); + if (plan.isOk()) { + await this.executePlan(plan.value); + return Ok(undefined); + } else { + return Err(plan.error); + } + } catch (err) { + span.setTag('error', err); + return Err(new FleetError('supervisor_tick_failed', { cause: err })); + } finally { + span.finish(); } - } catch (err) { - return Err(new FleetError('supervisor_tick_failed', { cause: err })); - } + }); } private async loop(): Promise { @@ -125,184 +165,208 @@ export class Supervisor { } private async plan(cursor?: number): Promise> { - const getDeployment = await deployments.getActive(this.dbClient.db); - if (getDeployment.isErr()) { - return Err(getDeployment.error); - } - if (!getDeployment.value) { - return Err(new FleetError('no_active_deployment')); - } - const deployment = getDeployment.value; - const plan: Operation[] = []; - - const search = await nodes.search(this.dbClient.db, { - states: ['PENDING', 'STARTING', 'RUNNING', 'OUTDATED', 'FINISHING', 'IDLE', 'TERMINATED', 'ERROR'], - ...(cursor ? { cursor } : {}) - }); - if (search.isErr()) { - return Err(search.error); - } - - const routingIds = Array.from(search.value.nodes.keys()); - const configOverrides = await nodeConfigOverrides.search(this.dbClient.db, { routingIds }); - if (configOverrides.isErr()) { - return Err(configOverrides.error); - } + const activeSpan = tracer.scope().active(); + const span = tracer.startSpan('fleet.supervisor.plan', { ...(activeSpan ? { childOf: activeSpan } : {}) }); + try { + const getDeployment = await deployments.getActive(this.dbClient.db); + if (getDeployment.isErr()) { + span.setTag('error', getDeployment.error); + return Err(getDeployment.error); + } + if (!getDeployment.value) { + const err = new FleetError('no_active_deployment'); + span.setTag('error', err); + return Err(err); + } + const deployment = getDeployment.value; + const plan: Operation[] = []; - for (const [routingId, nodes] of search.value.nodes) { - // Start pending nodes - plan.push(...(nodes.PENDING || []).map((node) => ({ type: 'START', node }))); + const search = await nodes.search(this.dbClient.db, { + states: ['PENDING', 'STARTING', 'RUNNING', 'OUTDATED', 'FINISHING', 'IDLE', 'TERMINATED', 'ERROR'], + ...(cursor ? { cursor } : {}) + }); + if (search.isErr()) { + span.setTag('error', search.error); + return Err(search.error); + } - // Timeout PENDING nodes if they are taking too long (nodeProvider probably failed to create the node) - plan.push( - ...(nodes.PENDING || []).flatMap((node) => { - if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.PENDING) { - return [{ type: 'FAIL', node, reason: 'pending_timeout_reached' as const }]; - } - return []; - }) - ); + const routingIds = Array.from(search.value.nodes.keys()); + const configOverrides = await nodeConfigOverrides.search(this.dbClient.db, { routingIds }); + if (configOverrides.isErr()) { + span.setTag('error', configOverrides.error); + return Err(configOverrides.error); + } - // Timeout STARTING nodes if they are taking too long - plan.push( - ...(nodes.STARTING || []).flatMap((node) => { - if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.STARTING) { - return [{ type: 'FAIL', node, reason: 'starting_timeout_reached' as const }]; - } - return []; - }) - ); + for (const [routingId, nodes] of search.value.nodes) { + // Start pending nodes + plan.push(...(nodes.PENDING || []).map((node) => ({ type: 'START', node }))); + + // Timeout PENDING nodes if they are taking too long (nodeProvider probably failed to create the node) + plan.push( + ...(nodes.PENDING || []).flatMap((node) => { + if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.PENDING) { + return [{ type: 'FAIL', node, reason: 'pending_timeout_reached' as const }]; + } + return []; + }) + ); - // Mark OUTDATED nodes - // if new deployment is available - // or if config overrides have changed - const hasConfigOverride = (node: Node, configOverride: NodeConfigOverride | undefined): boolean => { - if (!configOverride) { - return false; - } - return !( - node.image === `${configOverride.image}:${deployment.commitId}` && - node.cpuMilli === configOverride.cpuMilli && - node.memoryMb === configOverride.memoryMb && - node.storageMb === configOverride.storageMb + // Timeout STARTING nodes if they are taking too long + plan.push( + ...(nodes.STARTING || []).flatMap((node) => { + if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.STARTING) { + return [{ type: 'FAIL', node, reason: 'starting_timeout_reached' as const }]; + } + return []; + }) ); - }; - plan.push( - ...(nodes.RUNNING || []).flatMap((node) => { - const configOverride = configOverrides.value.get(node.routingId); - if (node.deploymentId !== deployment.id || hasConfigOverride(node, configOverride)) { - return [{ type: 'OUTDATE', node }]; - } - return []; - }) - ); - // Mark OUTDATED nodes to FINISHING once there is RUNNING nodes to replace them - plan.push( - ...(nodes.OUTDATED || []).flatMap((node) => { - if ((nodes.RUNNING?.length || 0) > 0) { - return [{ type: 'FINISHING', node }]; + // Mark OUTDATED nodes + // if new deployment is available + // or if config overrides have changed + const hasConfigOverride = (node: Node, configOverride: NodeConfigOverride | undefined): boolean => { + if (!configOverride) { + return false; } - return []; - }) - ); + return !( + node.image === `${configOverride.image}:${deployment.commitId}` && + node.cpuMilli === configOverride.cpuMilli && + node.memoryMb === configOverride.memoryMb && + node.storageMb === configOverride.storageMb + ); + }; + plan.push( + ...(nodes.RUNNING || []).flatMap((node) => { + const configOverride = configOverrides.value.get(node.routingId); + if (node.deploymentId !== deployment.id || hasConfigOverride(node, configOverride)) { + return [{ type: 'OUTDATE', node }]; + } + return []; + }) + ); - // if OUTDATED node but no RUNNING or upcoming nodes then create a new one - if ((nodes.OUTDATED?.length || 0) > 0 && (nodes.RUNNING?.length || 0) + (nodes.STARTING?.length || 0) + (nodes.PENDING?.length || 0) === 0) { - plan.push({ type: 'CREATE', routingId, deployment }); - } + // Mark OUTDATED nodes to FINISHING once there is RUNNING nodes to replace them + plan.push( + ...(nodes.OUTDATED || []).flatMap((node) => { + if ((nodes.RUNNING?.length || 0) > 0) { + return [{ type: 'FINISHING', node }]; + } + return []; + }) + ); - // Warn about old finishing nodes - plan.push( - ...(nodes.FINISHING || []).flatMap((node) => { - if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.FINISHING) { - return [{ type: 'FINISHING_TIMEOUT', node }]; - } - return []; - }) - ); + // if OUTDATED node but no RUNNING or upcoming nodes then create a new one + if ((nodes.OUTDATED?.length || 0) > 0 && (nodes.RUNNING?.length || 0) + (nodes.STARTING?.length || 0) + (nodes.PENDING?.length || 0) === 0) { + plan.push({ type: 'CREATE', routingId, deployment }); + } - // Terminate IDLE nodes - plan.push(...(nodes.IDLE || []).map((node) => ({ type: 'TERMINATE' as const, node }))); + // Warn about old finishing nodes + plan.push( + ...(nodes.FINISHING || []).flatMap((node) => { + if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.FINISHING) { + return [{ type: 'FINISHING_TIMEOUT', node }]; + } + return []; + }) + ); - // Timeout IDLE nodes if they are taking too long (nodeProvider probably failed to terminate the node) - plan.push( - ...(nodes.IDLE || []).flatMap((node) => { - if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.IDLE) { - return [{ type: 'FAIL', node, reason: 'idle_timeout_reached' as const }]; - } - return []; - }) - ); + // Terminate IDLE nodes + plan.push(...(nodes.IDLE || []).map((node) => ({ type: 'TERMINATE' as const, node }))); + + // Timeout IDLE nodes if they are taking too long (nodeProvider probably failed to terminate the node) + plan.push( + ...(nodes.IDLE || []).flatMap((node) => { + if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.IDLE) { + return [{ type: 'FAIL', node, reason: 'idle_timeout_reached' as const }]; + } + return []; + }) + ); - // Remove old terminated nodes - plan.push( - ...(nodes.TERMINATED || []).flatMap((node) => { - if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.TERMINATED) { - return [{ type: 'REMOVE', node }]; - } - return []; - }) - ); + // Remove old terminated nodes + plan.push( + ...(nodes.TERMINATED || []).flatMap((node) => { + if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.TERMINATED) { + return [{ type: 'REMOVE', node }]; + } + return []; + }) + ); - // Remove old error nodes - plan.push( - ...(nodes.ERROR || []).flatMap((node) => { - if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.ERROR) { - return [{ type: 'REMOVE', node }]; - } - return []; - }) - ); - } + // Remove old error nodes + plan.push( + ...(nodes.ERROR || []).flatMap((node) => { + if (Date.now() - node.lastStateTransitionAt.getTime() > STATE_TIMEOUT_MS.ERROR) { + return [{ type: 'REMOVE', node }]; + } + return []; + }) + ); + } - // Recursively fetch next page of nodes - if (search.value.nextCursor) { - const nextPagePlan = await this.plan(search.value.nextCursor); - if (nextPagePlan.isErr()) { - logger.error('Failed to get next plan:', nextPagePlan.error); - } else { - plan.push(...nextPagePlan.value); + // Recursively fetch next page of nodes + if (search.value.nextCursor) { + const nextPagePlan = await this.plan(search.value.nextCursor); + if (nextPagePlan.isErr()) { + logger.error('Failed to get next plan:', nextPagePlan.error); + } else { + plan.push(...nextPagePlan.value); + } } - } - return Ok(plan); + return Ok(plan); + } finally { + span.finish(); + } } private async executePlan(plan: Operation[]): Promise { - if (plan.length > 0) { - logger.info('Executing plan:', plan); - } - for (const action of plan) { - if (this.tickCancelled) { - return; - } - const result = await this.execute(action); - if (result.isErr()) { - // TODO: trace - logger.error('Failed to execute action:', result.error, result.error.cause); + const activeSpan = tracer.scope().active(); + const span = tracer.startSpan('fleet.supervisor.executePlan', { ...(activeSpan ? { childOf: activeSpan } : {}) }); + try { + for (const operation of plan) { + if (this.tickCancelled) { + span.setTag('error', 'tick_cancelled'); + return; + } + + const operationSpan = tracer.startSpan(`fleet.supervisor.operation.${operation.type.toLowerCase()}`, { + childOf: span, + tags: { fleet: Operation.asSpanTags(operation) } + }); + try { + const result = await this.execute(operation); + if (result.isErr()) { + operationSpan.setTag('error', result.error); + logger.error('Failed to execute operation:', result.error, result.error.cause); + } + } finally { + operationSpan.finish(); + } } + } finally { + span.finish(); } } - private async execute(action: Operation): Promise> { - switch (action.type) { + private async execute(operation: Operation): Promise> { + switch (operation.type) { case 'CREATE': - return this.createNode(this.dbClient.db, action); + return this.createNode(this.dbClient.db, operation); case 'START': - return this.startNode(action); + return this.startNode(operation); case 'OUTDATE': - return this.outdateNode(action); + return this.outdateNode(operation); case 'FINISHING': - return this.finishingNode(action); + return this.finishingNode(operation); case 'TERMINATE': - return this.terminateNode(action); + return this.terminateNode(operation); case 'REMOVE': - return this.removeNode(action); + return this.removeNode(operation); case 'FINISHING_TIMEOUT': - return this.finishingTimeout(action); + return this.finishingTimeout(operation); case 'FAIL': - return this.failNode(action); + return this.failNode(operation); } } diff --git a/packages/jobs/lib/runner/runner.ts b/packages/jobs/lib/runner/runner.ts index e43b9cd8488..1b242413604 100644 --- a/packages/jobs/lib/runner/runner.ts +++ b/packages/jobs/lib/runner/runner.ts @@ -35,20 +35,24 @@ function getRunnerId(suffix: string): string { } export async function getRunner(teamId: number): Promise> { - // a runner per account in prod only - const runnerId = isProd ? getRunnerId(`${teamId}`) : getRunnerId('default'); - - const isFleetGloballyEnabled = await featureFlags.isEnabled('fleet', 'global', false); - const isFleetEnabledForTeam = await featureFlags.isEnabled('fleet', `${teamId}`, false); - const isFleetEnabled = isFleetGloballyEnabled || isFleetEnabledForTeam; - if (isFleetEnabled) { - const runner = await getOrStartRunner(runnerId).catch(() => getOrStartRunner(getRunnerId('default'))); + try { + // a runner per account in prod only + const runnerId = isProd ? getRunnerId(`${teamId}`) : getRunnerId('default'); + + const isFleetGloballyEnabled = await featureFlags.isEnabled('fleet', 'global', false); + const isFleetEnabledForTeam = await featureFlags.isEnabled('fleet', `${teamId}`, false); + const isFleetEnabled = isFleetGloballyEnabled || isFleetEnabledForTeam; + if (isFleetEnabled) { + const runner = await getOrStartRunner(runnerId).catch(() => getOrStartRunner(getRunnerId('default'))); + return Ok(runner); + } + + // fallback to default runner if account runner isn't ready yet + const runner = await getOrStartRunnerLegacy(runnerId).catch(() => getOrStartRunnerLegacy(getRunnerId('default'))); return Ok(runner); + } catch (err) { + return Err(new Error(`Failed to get runner for team ${teamId}`, { cause: err })); } - - // fallback to default runner if account runner isn't ready yet - const runner = await getOrStartRunnerLegacy(runnerId).catch(() => getOrStartRunnerLegacy(getRunnerId('default'))); - return Ok(runner); } export async function idle(nodeId: number): Promise> {