From 064eb5da3f352eabb99153cae44094af53198c9b Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 9 Jan 2025 11:11:25 -0800 Subject: [PATCH] common: mark actions as PENDING while executing --- .../src/indexer-management/actions.ts | 142 ++++++++++++++---- 1 file changed, 109 insertions(+), 33 deletions(-) diff --git a/packages/indexer-common/src/indexer-management/actions.ts b/packages/indexer-common/src/indexer-management/actions.ts index 8f806cce5..95637e5ab 100644 --- a/packages/indexer-common/src/indexer-management/actions.ts +++ b/packages/indexer-common/src/indexer-management/actions.ts @@ -206,7 +206,41 @@ export class ActionManager { }) } - private async updateActionStatuses( + /** + * Mark actions with the given status. + * @param actions + * @param transaction + * @param status + * @returns updated actions + * @throws error if the update fails + */ + private async markActions( + actions: Action[], + transaction: Transaction, + status: ActionStatus, + ): Promise { + const ids = actions.map((action) => action.id) + const [, updatedActions] = await this.models.Action.update( + { + status, + }, + { + where: { id: ids }, + returning: true, + transaction, + }, + ) + return updatedActions + } + + /** + * Update the action statuses from the results provided by execution. + * + * @param results + * @param transaction + * @returns updated actions + */ + private async updateActionStatusesWithResults( results: AllocationResult[], transaction: Transaction, ): Promise { @@ -255,12 +289,14 @@ export class ActionManager { protocolNetwork, }) - logger.debug('Begin database transaction for executing approved actions') + logger.debug('Begin executing approved actions') + let batchStartTime + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - await this.models.Action.sequelize!.transaction( + const prioritizedActions: Action[] = await this.models.Action.sequelize!.transaction( { isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }, async (transaction) => { - const transactionOpenTime = Date.now() + batchStartTime = Date.now() let approvedActions try { // Execute already approved actions in the order of type and priority. @@ -271,10 +307,7 @@ export class ActionManager { const actionTypePriority = ['unallocate', 'reallocate', 'allocate'] approvedActions = ( await this.models.Action.findAll({ - where: { - status: ActionStatus.APPROVED, - protocolNetwork, - }, + where: { status: ActionStatus.APPROVED, protocolNetwork }, order: [['priority', 'ASC']], transaction, lock: transaction.LOCK.UPDATE, @@ -283,6 +316,16 @@ export class ActionManager { return actionTypePriority.indexOf(a.type) - actionTypePriority.indexOf(b.type) }) + const pendingActions = await this.models.Action.findAll({ + where: { status: ActionStatus.PENDING, protocolNetwork }, + order: [['priority', 'ASC']], + transaction, + }) + if (pendingActions.length > 0) { + logger.warn(`${pendingActions} Actions found in PENDING state when execution began. Was there a crash? \ + These indicate that execution was interrupted and will need to be cleared manually.`) + } + if (approvedActions.length === 0) { logger.debug('No approved actions were found for this network') return [] @@ -295,34 +338,67 @@ export class ActionManager { logger.error('Failed to query approved actions for network', { error }) return [] } - try { - logger.debug('Executing batch action', { - approvedActions, - startTimeMs: Date.now() - transactionOpenTime, - }) + // mark all approved actions as PENDING, this serves as a lock on other processing of them + await this.markActions(approvedActions, transaction, ActionStatus.PENDING) + return prioritizedActions + }, + ) - // This will return all results if successful, if failed it will return the failed actions - const allocationManager = - this.allocationManagers[network.specification.networkIdentifier] - const results = await allocationManager.executeBatch(approvedActions) + try { + logger.debug('Executing batch action', { + prioritizedActions, + startTimeMs: Date.now() - batchStartTime, + }) + + const allocationManager = + this.allocationManagers[network.specification.networkIdentifier] + + let results + try { + // This will return all results if successful, if failed it will return the failed actions + results = await allocationManager.executeBatch(prioritizedActions) + logger.debug('Completed batch action execution', { + results, + endTimeMs: Date.now() - batchStartTime, + }) + } catch (error) { + // Release the actions from the PENDING state. This means they will be retried again on the next batch execution. + logger.error( + `Error raised during executeBatch, releasing ${prioritizedActions.length} actions from PENDING state. \ + These will be attempted again on the next batch.`, + error, + ) + await this.models.Action.sequelize!.transaction( + { isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }, + async (transaction) => { + return await this.markActions( + prioritizedActions, + transaction, + ActionStatus.APPROVED, + ) + }, + ) + return [] + } - logger.debug('Completed batch action execution', { - results, - endTimeMs: Date.now() - transactionOpenTime, - }) - updatedActions = await this.updateActionStatuses(results, transaction) + // Happy path: execution went well (success or failure but no exceptions). Update the actions with the results. + updatedActions = await this.models.Action.sequelize!.transaction( + { isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }, + async (transaction) => { + return await this.updateActionStatusesWithResults(results, transaction) + }, + ) - logger.debug('Updated action statuses', { - updatedActions, - updatedTimeMs: Date.now() - transactionOpenTime, - }) - } catch (error) { - logger.error(`Failed to execute batch tx on staking contract: ${error}`) - throw indexerError(IndexerErrorCode.IE072, error) - } - }, - ) - logger.debug('End database transaction for executing approved actions') + logger.debug('Updated action statuses', { + updatedActions, + updatedTimeMs: Date.now() - batchStartTime, + }) + } catch (error) { + logger.error(`Failed to execute batch tx on staking contract: ${error}`) + throw indexerError(IndexerErrorCode.IE072, error) + } + + logger.debug('End executing approved actions') return updatedActions }