Skip to content

Commit

Permalink
fix: use subgraph timestamp for tap
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <gustavo@semiotic.ai>
  • Loading branch information
gusinacio committed Aug 28, 2024
1 parent fabf017 commit fa75ade
Showing 1 changed file with 162 additions and 64 deletions.
226 changes: 162 additions & 64 deletions packages/indexer-common/src/allocations/query-fees.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ interface RavWithAllocation {
sender: Address
}

interface TapSubgraphResponse {
transactions: {
allocationID: string
timestamp: number
sender: {
id: string
}
}[]
_meta: {
block: {
timestamp: number
}
}
}

export class AllocationReceiptCollector implements ReceiptCollector {
declare logger: Logger
declare metrics: ReceiptMetrics
Expand Down Expand Up @@ -178,9 +193,13 @@ export class AllocationReceiptCollector implements ReceiptCollector {
// flag during startup.
collector.startReceiptCollecting()
collector.startVoucherProcessing()
if (collector.tapContracts) {
if (collector.tapContracts && collector.tapSubgraph) {
collector.logger.info(`RAV processing is initiated`)
collector.startRAVProcessing()
} else {
collector.logger.info(`RAV process not initiated.
Tap Contracts: ${!!collector.tapSubgraph}.
Tap Subgraph: ${!!collector.tapSubgraph}.`)
}
await collector.queuePendingReceiptsFromDatabase()
return collector
Expand Down Expand Up @@ -462,13 +481,18 @@ export class AllocationReceiptCollector implements ReceiptCollector {
timer: timer(30_000),
}).tryMap(
async () => {
const ravs = await this.pendingRAVs()
let ravs = await this.pendingRAVs()
if (ravs.length === 0) {
this.logger.info(`No pending RAVs to process`)
return []
}
if (ravs.length > 0) {
ravs = await this.filterAndUpdateRavs(ravs)
}
const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs)
this.logger.info(`Retrieved allocations for pending RAVs \n: ${allocations}`)
this.logger.info(
`Retrieved allocations for pending RAVs \n: ${JSON.stringify(allocations)}`,
)
return ravs
.map((rav) => {
const signedRav = rav.getSignedRAV()
Expand Down Expand Up @@ -564,73 +588,137 @@ export class AllocationReceiptCollector implements ReceiptCollector {
// redeem only if last is true
// Later can add order and limit
private async pendingRAVs(): Promise<ReceiptAggregateVoucher[]> {
const unfinalizedRAVs = await this.models.receiptAggregateVouchers.findAll({
return await this.models.receiptAggregateVouchers.findAll({
where: { last: true, final: false },
})
// Obtain allocationIds to use as filter in subgraph
const unfinalizedRavsAllocationIds = unfinalizedRAVs.map((rav) =>
rav.getSignedRAV().rav.allocationId.toLowerCase(),
)
}

if (unfinalizedRavsAllocationIds.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let tapSubgraphResponse: any
if (!this.tapSubgraph) {
tapSubgraphResponse = { data: { transactions: [] } }
} else {
tapSubgraphResponse = await this.tapSubgraph!.query(
gql`
query transactions($unfinalizedRavsAllocationIds: [String!]!) {
transactions(
where: { type: "redeem", allocationID_in: $unfinalizedRavsAllocationIds }
) {
allocationID
}
}
`,
{ unfinalizedRavsAllocationIds },
)
}
const alreadyRedeemedAllocations = tapSubgraphResponse.data.transactions.map(
(transaction) => transaction.allocationID,
)
private async filterAndUpdateRavs(
ravLastNotFinal: ReceiptAggregateVoucher[],
): Promise<ReceiptAggregateVoucher[]> {
const tapSubgraphResponse = await this.findTransactionsForRavs(ravLastNotFinal)

// Filter unfinalized RAVS fetched from DB, keeping RAVs that have not yet been redeemed on-chain
const nonRedeemedAllocationIDAddresses = unfinalizedRavsAllocationIds.filter(
(allocationID) => !alreadyRedeemedAllocations.includes(allocationID),
)
// Lowercase and remove '0x' prefix of addresses to match format in TAP DB Tables
const nonRedeemedAllocationIDsTrunc = nonRedeemedAllocationIDAddresses.map(
(allocationID) => allocationID.toLowerCase().replace('0x', ''),
const redeemedRavsNotOnOurDatabase = tapSubgraphResponse.transactions.filter((tx) => {
!ravLastNotFinal.find(
(rav) =>
rav.senderAddress.toLowerCase().endsWith(tx.sender.id) &&
rav.allocationId.toLowerCase().endsWith(tx.allocationID),
)
})

// for each transaction that is not redeemed on our database
// but was redeemed on the blockchain, update it to redeemed
if (redeemedRavsNotOnOurDatabase.length > 0) {
for (const rav of redeemedRavsNotOnOurDatabase) {
await this.markRavAsRedeemed(rav.allocationID, rav.sender.id, rav.timestamp)
}
}

// Filter unfinalized RAVS fetched from DB, keeping RAVs that have not yet been redeemed on-chain
const nonRedeemedRavs = ravLastNotFinal.filter(
(rav) =>
!tapSubgraphResponse.transactions.find((tx) => {
rav.senderAddress == toAddress(tx.sender.id) &&
rav.allocationId == toAddress(tx.allocationID)
}),
)

// we use the subgraph timestamp to make decisions
// block timestamp minus 1 minute (because of blockchain timestamp uncertainty)
const ONE_MINUTE = 60
const blockTimestampSecs = tapSubgraphResponse._meta.block.timestamp - ONE_MINUTE

// Mark RAVs as unredeemed in DB if the TAP subgraph couldn't find the redeem Tx.
// To handle a chain reorg that "unredeemed" the RAVs.
// WE use sql directly due to a bug in sequelize update:
// https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever)
if (nonRedeemedRavs.length > 0) {
await this.revertRavsRedeemed(nonRedeemedRavs, blockTimestampSecs)
}

// Mark RAVs as unredeemed in DB if the TAP subgraph couldn't find the redeem Tx.
// To handle a chain reorg that "unredeemed" the RAVs.
// WE use sql directly due to a bug in sequelize update:
// https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever)
// For all RAVs that passed finality time, we mark it as final
await this.markRavsAsFinal(blockTimestampSecs)

let query = `
return await this.models.receiptAggregateVouchers.findAll({
where: { redeemedAt: null, final: false, last: true },
})
}

private async findTransactionsForRavs(
ravs: ReceiptAggregateVoucher[],
): Promise<TapSubgraphResponse> {
// Obtain allocationIds to use as filter in subgraph
const ravLastNotFinalAllocationIds = ravs.map((rav) => [
rav.getSignedRAV().rav.allocationId.toLowerCase(),
toAddress(rav.senderAddress).toLowerCase(),
])

const response = await this.tapSubgraph!.query<TapSubgraphResponse>(
gql`
query transactions(
$unfinalizedRavsAllocationIds: [String!]!
$senderAddresses: [String!]!
) {
transactions(
where: {
type: "redeem"
allocationID_in: $unfinalizedRavsAllocationIds
_sender: { id_in: $senderAddresses }
}
) {
allocationID
timestamp
sender {
id
}
}
_meta {
block {
timestamp
}
}
}
`,
{
unfinalizedRavsAllocationIds: ravLastNotFinalAllocationIds.map(
(value) => value[0],
),
senderAddresses: ravLastNotFinalAllocationIds.map((value) => value[1]),
},
)
return response.data!
}

// for every allocation_id of this list that contains the timestamp_ns less than the current
// subgraph timestamp
private async revertRavsRedeemed(
ravsNotRedeemed: { allocationId: string; senderAddress: string }[],
blockTimestampSecs: number,
) {
const SECONDS_TO_NANOSECONDS = 1000000000
const blockTimestampNs = blockTimestampSecs * SECONDS_TO_NANOSECONDS
const query = `
UPDATE scalar_tap_ravs
SET redeemed_at = NULL
WHERE allocation_id IN ('${nonRedeemedAllocationIDsTrunc.join("', '")}')
WHERE (allocation_id, sender_address) IN (VALUES '${ravsNotRedeemed
.map((rav) => `('${rav.allocationId}', '${rav.senderAddress}')`)
.join("', '")}')
AND timetstamp_ns < ${blockTimestampNs}
`
await this.models.receiptAggregateVouchers.sequelize?.query(query)
await this.models.receiptAggregateVouchers.sequelize?.query(query)
}

// // Update those that redeemed_at is older than 60 minutes and mark as final
query = `
// we use blockTimestamp instead of NOW() because we must be older than
// the subgraph timestamp
private async markRavsAsFinal(blockTimestampSecs: number) {
const query = `
UPDATE scalar_tap_ravs
SET final = TRUE
WHERE last = TRUE AND final = FALSE
AND redeemed_at < NOW() - INTERVAL '${this.finalityTime} second'
AND redeemed_at IS NOT NULL
AND redeemed_at < ${blockTimestampSecs - this.finalityTime}
`
await this.models.receiptAggregateVouchers.sequelize?.query(query)

return await this.models.receiptAggregateVouchers.findAll({
where: { redeemedAt: null, final: false, last: true },
})
}
return []
await this.models.receiptAggregateVouchers.sequelize?.query(query)
}

private encodeReceiptBatch(receipts: AllocationReceipt[]): BytesWriter {
Expand Down Expand Up @@ -942,18 +1030,12 @@ export class AllocationReceiptCollector implements ReceiptCollector {
)

try {
const addressWithoutPrefix = rav.allocationId.toLowerCase().replace('0x', '')
// WE use sql directly due to a bug in sequelize update:
// https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever)
const query = `
UPDATE scalar_tap_ravs
SET redeemed_at = NOW()
WHERE allocation_id = '${addressWithoutPrefix}'
`
await this.models.receiptAggregateVouchers.sequelize?.query(query)
const allocationId = rav.allocationId.toLowerCase().replace('0x', '')
const senderAddress = sender.toLowerCase().replace('0x', '')

await this.markRavAsRedeemed(allocationId, senderAddress)
logger.info(
`Updated receipt aggregate vouchers table with redeemed_at for allocation ${addressWithoutPrefix}`,
`Updated receipt aggregate vouchers table with redeemed_at for allocation ${allocationId} and sender ${senderAddress}`,
)
} catch (err) {
logger.warn(
Expand Down Expand Up @@ -1005,6 +1087,22 @@ export class AllocationReceiptCollector implements ReceiptCollector {
)
}

private async markRavAsRedeemed(
allocationId: string,
senderAddress: string,
timestamp?: number,
) {
// WE use sql directly due to a bug in sequelize update:
// https://github.com/sequelize/sequelize/issues/7664 (bug been open for 7 years no fix yet or ever)
const query = `
UPDATE scalar_tap_ravs
SET redeemed_at = ${timestamp ? timestamp : 'NOW()'}
WHERE (allocation_id, sender_address) IN '${allocationId}'
AND sender_address = '${senderAddress}'
`
await this.models.receiptAggregateVouchers.sequelize?.query(query)
}

public async queuePendingReceiptsFromDatabase(): Promise<void> {
// Obtain all closed allocations
const closedAllocations = await this.models.allocationSummaries.findAll({
Expand Down

0 comments on commit fa75ade

Please sign in to comment.