Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mark actions as PENDING while executing #1074

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 109 additions & 33 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,41 @@ export class ActionManager {
})
}

private async updateActionStatuses(
/**
* Mark actions with the given status.
* @param actions
* @param transaction
* @param status
* @returns updated actions
* @throws error if the update fails
*/
private async markActions(
actions: Action[],
transaction: Transaction,
status: ActionStatus,
): Promise<Action[]> {
const ids = actions.map((action) => action.id)
const [, updatedActions] = await this.models.Action.update(
{
status,
},
{
where: { id: ids },
returning: true,
transaction,
},
)
return updatedActions
}

/**
* Update the action statuses from the results provided by execution.
*
* @param results
* @param transaction
* @returns updated actions
*/
private async updateActionStatusesWithResults(
results: AllocationResult[],
transaction: Transaction,
): Promise<Action[]> {
Expand Down Expand Up @@ -255,12 +289,14 @@ export class ActionManager {
protocolNetwork,
})

logger.debug('Begin database transaction for executing approved actions')
logger.debug('Begin executing approved actions')
let batchStartTime

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await this.models.Action.sequelize!.transaction(
const prioritizedActions: Action[] = await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
const transactionOpenTime = Date.now()
batchStartTime = Date.now()
let approvedActions
try {
// Execute already approved actions in the order of type and priority.
Expand All @@ -271,10 +307,7 @@ export class ActionManager {
const actionTypePriority = ['unallocate', 'reallocate', 'allocate']
approvedActions = (
await this.models.Action.findAll({
where: {
status: ActionStatus.APPROVED,
protocolNetwork,
},
where: { status: ActionStatus.APPROVED, protocolNetwork },
order: [['priority', 'ASC']],
transaction,
lock: transaction.LOCK.UPDATE,
Expand All @@ -283,6 +316,16 @@ export class ActionManager {
return actionTypePriority.indexOf(a.type) - actionTypePriority.indexOf(b.type)
})

const pendingActions = await this.models.Action.findAll({
where: { status: ActionStatus.PENDING, protocolNetwork },
order: [['priority', 'ASC']],
transaction,
})
if (pendingActions.length > 0) {
logger.warn(`${pendingActions} Actions found in PENDING state when execution began. Was there a crash? \
These indicate that execution was interrupted and will need to be cleared manually.`)
}

if (approvedActions.length === 0) {
logger.debug('No approved actions were found for this network')
return []
Expand All @@ -295,34 +338,67 @@ export class ActionManager {
logger.error('Failed to query approved actions for network', { error })
return []
}
try {
logger.debug('Executing batch action', {
approvedActions,
startTimeMs: Date.now() - transactionOpenTime,
})
// mark all approved actions as PENDING, this serves as a lock on other processing of them
await this.markActions(approvedActions, transaction, ActionStatus.PENDING)
return prioritizedActions
},
)

// This will return all results if successful, if failed it will return the failed actions
const allocationManager =
this.allocationManagers[network.specification.networkIdentifier]
const results = await allocationManager.executeBatch(approvedActions)
try {
logger.debug('Executing batch action', {
prioritizedActions,
startTimeMs: Date.now() - batchStartTime,
})

const allocationManager =
this.allocationManagers[network.specification.networkIdentifier]

let results
try {
// This will return all results if successful, if failed it will return the failed actions
results = await allocationManager.executeBatch(prioritizedActions)
logger.debug('Completed batch action execution', {
results,
endTimeMs: Date.now() - batchStartTime,
})
} catch (error) {
// Release the actions from the PENDING state. This means they will be retried again on the next batch execution.
logger.error(
`Error raised during executeBatch, releasing ${prioritizedActions.length} actions from PENDING state. \
These will be attempted again on the next batch.`,
error,
)
await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
return await this.markActions(
prioritizedActions,
transaction,
ActionStatus.APPROVED,
)
},
)
return []
}

logger.debug('Completed batch action execution', {
results,
endTimeMs: Date.now() - transactionOpenTime,
})
updatedActions = await this.updateActionStatuses(results, transaction)
// Happy path: execution went well (success or failure but no exceptions). Update the actions with the results.
updatedActions = await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
return await this.updateActionStatusesWithResults(results, transaction)
},
)

logger.debug('Updated action statuses', {
updatedActions,
updatedTimeMs: Date.now() - transactionOpenTime,
})
} catch (error) {
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
throw indexerError(IndexerErrorCode.IE072, error)
}
},
)
logger.debug('End database transaction for executing approved actions')
logger.debug('Updated action statuses', {
updatedActions,
updatedTimeMs: Date.now() - batchStartTime,
})
} catch (error) {
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
throw indexerError(IndexerErrorCode.IE072, error)
}

logger.debug('End executing approved actions')
return updatedActions
}

Expand Down
Loading