diff --git a/src/main/kotlin/org/wfanet/measurement/duchy/deploy/gcloud/spanner/computation/GcpSpannerComputationsDatabaseTransactor.kt b/src/main/kotlin/org/wfanet/measurement/duchy/deploy/gcloud/spanner/computation/GcpSpannerComputationsDatabaseTransactor.kt index a79c9bba078..9b433bd2f59 100644 --- a/src/main/kotlin/org/wfanet/measurement/duchy/deploy/gcloud/spanner/computation/GcpSpannerComputationsDatabaseTransactor.kt +++ b/src/main/kotlin/org/wfanet/measurement/duchy/deploy/gcloud/spanner/computation/GcpSpannerComputationsDatabaseTransactor.kt @@ -144,37 +144,30 @@ class GcpSpannerComputationsDatabaseTransactor< ownerId: String, lockDuration: Duration, prioritizedStages: List, - ): String? { - /** Claim a specific task represented by the results of running the above sql. */ - suspend fun claimSpecificTask(result: UnclaimedTaskQueryResult): Boolean = - databaseClient.readWriteTransaction().execute { txn -> - claim( - txn, - result.computationId, - result.computationStage, - result.nextAttempt, - result.updateTime, - ownerId, - lockDuration, + ): String? = + databaseClient.readWriteTransaction().execute { txn -> + UnclaimedTasksQuery( + computationMutations.protocolEnumToLong(protocol), + prioritizedStages, + computationMutations::longValuesToComputationStageEnum, + computationMutations::computationStageEnumToLongValues, + clock.gcloudTimestamp(), ) - } - return UnclaimedTasksQuery( - computationMutations.protocolEnumToLong(protocol), - prioritizedStages, - computationMutations::longValuesToComputationStageEnum, - computationMutations::computationStageEnumToLongValues, - clock.gcloudTimestamp(), - ) - .execute(databaseClient) - // First the possible tasks to claim are selected from the computations table, then for each - // item in the list we try to claim the lock in a transaction which will only succeed if the - // lock is still available. This pattern means only the item which is being updated - // would need to be locked and not every possible computation that can be worked on. - .filter { claimSpecificTask(it) } - // If the value is null, no tasks were claimed. - .firstOrNull() - ?.globalId - } + .execute(txn) + .filter { result -> + claim( + txn, + result.computationId, + result.computationStage, + result.nextAttempt, + result.updateTime, + ownerId, + lockDuration, + ) + } + .firstOrNull() + ?.globalId + } /** * Tries to claim a specific computation for an owner, returning the result of the attempt. If a