From 8985292ee576bd34b6947477cee3daaeef5a0ebd Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 11 Jul 2024 11:49:04 -0700 Subject: [PATCH] agent,common: Improve process of ensuring subgraphs are deployed - Ahead of executing a batch, ensure all deployments to be allocated to are ensured - Check status of subgraph deployment assignments: if already syncing then no-op, if paused then resume, if doesn't exist then create and deploy --- packages/indexer-agent/src/agent.ts | 5 +- .../indexer-common/src/allocations/monitor.ts | 4 +- packages/indexer-common/src/errors.ts | 4 ++ packages/indexer-common/src/graph-node.ts | 67 +++++++++++++++++-- .../src/indexer-management/allocations.ts | 33 ++++++++- 5 files changed, 106 insertions(+), 7 deletions(-) diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index 841503bdc..7086a0e9f 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -36,6 +36,7 @@ import { networkIsL2, networkIsL1, DeploymentManagementMode, + SubgraphStatus, } from '@graphprotocol/indexer-common' import PQueue from 'p-queue' @@ -970,6 +971,8 @@ export class Agent { // Deploy/remove up to 10 subgraphs in parallel const queue = new PQueue({ concurrency: 10 }) + const currentAssignments = + await this.graphNode.subgraphDeploymentsAssignments(SubgraphStatus.ALL) // Index all new deployments worth indexing await queue.addAll( deploy.map(deployment => async () => { @@ -981,7 +984,7 @@ export class Agent { }) // Ensure the deployment is deployed to the indexer - await this.graphNode.ensure(name, deployment) + await this.graphNode.ensure(name, deployment, currentAssignments) }), ) diff --git a/packages/indexer-common/src/allocations/monitor.ts b/packages/indexer-common/src/allocations/monitor.ts index d1a119b7b..5ef5c9555 100644 --- a/packages/indexer-common/src/allocations/monitor.ts +++ b/packages/indexer-common/src/allocations/monitor.ts @@ -152,7 +152,9 @@ export const monitorEligibleAllocations = ({ const allocations = [...activeAllocations, ...recentlyClosedAllocations] if (allocations.length == 0) { - throw new Error(`No data / indexer not found on chain`) + logger.warn(`No data / indexer not found on chain`, { + allocations: [], + }) } // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/packages/indexer-common/src/errors.ts b/packages/indexer-common/src/errors.ts index f555d7523..34dc610d9 100644 --- a/packages/indexer-common/src/errors.ts +++ b/packages/indexer-common/src/errors.ts @@ -86,6 +86,8 @@ export enum IndexerErrorCode { IE073 = 'IE073', IE074 = 'IE074', IE075 = 'IE075', + IE076 = 'IE076', + IE077 = 'IE077', } export const INDEXER_ERROR_MESSAGES: Record = { @@ -165,6 +167,8 @@ export const INDEXER_ERROR_MESSAGES: Record = { IE073: 'Failed to query subgraph features from indexing statuses endpoint', IE074: 'Failed to deploy subgraph: network not supported', IE075: 'Failed to connect to network contracts', + IE076: 'Failed to resume subgraph deployment', + IE077: 'Failed to allocate: subgraph not healthily syncing', } export type IndexerErrorCause = unknown diff --git a/packages/indexer-common/src/graph-node.ts b/packages/indexer-common/src/graph-node.ts index a4133a700..59ee2c27a 100644 --- a/packages/indexer-common/src/graph-node.ts +++ b/packages/indexer-common/src/graph-node.ts @@ -181,7 +181,7 @@ export class GraphNode { return result.data.indexingStatuses .filter((status: QueryResult) => { if (subgraphStatus === SubgraphStatus.ACTIVE) { - return status.node !== 'removed' + return !status.paused } else if (subgraphStatus === SubgraphStatus.PAUSED) { return status.node === 'removed' || status.paused === true } else { @@ -192,6 +192,7 @@ export class GraphNode { return { id: new SubgraphDeploymentID(status.subgraphDeployment), node: status.node, + paused: status.paused, } }) } catch (error) { @@ -343,6 +344,29 @@ export class GraphNode { } } + async resume(deployment: SubgraphDeploymentID): Promise { + try { + this.logger.info(`Resume subgraph deployment`, { + deployment: deployment.display, + }) + const response = await this.admin.request('subgraph_resume', { + deployment: deployment.ipfsHash, + }) + if (response.error) { + throw response.error + } + this.logger.info(`Successfully resumed subgraph deployment`, { + deployment: deployment.display, + }) + } catch (error) { + const errorCode = IndexerErrorCode.IE076 + this.logger.error(INDEXER_ERROR_MESSAGES[errorCode], { + deployment: deployment.display, + error: indexerError(errorCode, error), + }) + } + } + async reassign(deployment: SubgraphDeploymentID, node: string): Promise { try { this.logger.info(`Reassign subgraph deployment`, { @@ -374,10 +398,45 @@ export class GraphNode { } } - async ensure(name: string, deployment: SubgraphDeploymentID): Promise { + async ensure( + name: string, + deployment: SubgraphDeploymentID, + currentAssignments?: SubgraphDeploymentAssignment[], + ): Promise { + this.logger.debug('Ensure subgraph deployment is syncing', { + name, + deployment: deployment.ipfsHash, + }) try { - await this.create(name) - await this.deploy(name, deployment) + const deploymentAssignments = + currentAssignments ?? + (await this.subgraphDeploymentsAssignments(SubgraphStatus.ALL)) + const matchingAssignment = deploymentAssignments.find( + (deploymentAssignment) => deploymentAssignment.id.ipfsHash == deployment.ipfsHash, + ) + + if (matchingAssignment?.paused == false) { + this.logger.debug('Subgraph deployment already syncing, ensure() is a no-op', { + name, + deployment: deployment.ipfsHash, + }) + } else if (matchingAssignment?.paused == true) { + this.logger.debug('Subgraph deployment paused, resuming', { + name, + deployment: deployment.ipfsHash, + }) + await this.resume(deployment) + } else { + this.logger.debug( + 'Subgraph deployment not found, creating subgraph name and deploying...', + { + name, + deployment: deployment.ipfsHash, + }, + ) + await this.create(name) + await this.deploy(name, deployment) + } } catch (error) { if (!(error instanceof IndexerError)) { const errorCode = IndexerErrorCode.IE020 diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 046f4566b..191e121b0 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -29,6 +29,7 @@ import { Network, ReallocateAllocationResult, SubgraphIdentifierType, + SubgraphStatus, uniqueAllocationID, upsertIndexingRule, } from '@graphprotocol/indexer-common' @@ -124,6 +125,8 @@ export class AllocationManager { const validatedActions = await this.validateActionBatchFeasibilty(actions) logger.trace('Validated actions', { validatedActions }) + await this.deployBeforeAllocating(logger, validatedActions) + const populateTransactionsResults = await this.prepareTransactions(validatedActions) const failedTransactionPreparations = populateTransactionsResults @@ -307,6 +310,28 @@ export class AllocationManager { } } + async deployBeforeAllocating(logger: Logger, actions: Action[]): Promise { + const allocateActions = actions.filter((action) => action.type == ActionType.ALLOCATE) + logger.info('Ensure subgraph deployments are deployed before we allocate to them', { + allocateActions, + }) + const currentAssignments = await this.graphNode.subgraphDeploymentsAssignments( + SubgraphStatus.ALL, + ) + await pMap( + allocateActions, + async (action: Action) => + await this.graphNode.ensure( + `indexer-agent/${action.deploymentID!.slice(-10)}`, + new SubgraphDeploymentID(action.deploymentID!), + currentAssignments, + ), + { + stopOnError: false, + }, + ) + } + async prepareAllocateParams( logger: Logger, context: TransactionPreparationContext, @@ -352,10 +377,16 @@ export class AllocationManager { ) if (!status) { throw indexerError( - IndexerErrorCode.IE020, + IndexerErrorCode.IE077, `Subgraph deployment, '${deployment.ipfsHash}', is not syncing`, ) } + if (status?.health == 'failed') { + throw indexerError( + IndexerErrorCode.IE077, + `Subgraph deployment, '${deployment.ipfsHash}', has failed`, + ) + } logger.debug('Obtain a unique Allocation ID') const { allocationSigner, allocationId } = uniqueAllocationID(