From 2aefdfcb1a2f4697e556a1df2390745c848343a3 Mon Sep 17 00:00:00 2001 From: advaith101 Date: Tue, 19 Nov 2024 17:35:27 -0500 Subject: [PATCH 1/2] feat: clean up comments/console.logs --- .../indexer/src/v3_indexer/builders/swaps.ts | 19 +++---------------- .../v3_indexer/indexers/amm-market/utils.ts | 5 ----- .../indexers/autocrat/autocrat-dao-indexer.ts | 5 +---- 3 files changed, 4 insertions(+), 25 deletions(-) diff --git a/packages/indexer/src/v3_indexer/builders/swaps.ts b/packages/indexer/src/v3_indexer/builders/swaps.ts index 4910ad9..905da4a 100644 --- a/packages/indexer/src/v3_indexer/builders/swaps.ts +++ b/packages/indexer/src/v3_indexer/builders/swaps.ts @@ -200,36 +200,24 @@ export class SwapBuilder { return Err({ type: SwapPersistableError.ArbTransactionError }); } + // now we are upserting price/twap in the buildOrderFromSwapIx function const result = await this.buildOrderFromSwapIx(swapIx, tx, mintIx); if (!result.success) { return Err(result.error); } const { swapOrder, swapTake } = result.ok; - // TODO: consider co-locating this logic so it can be shared - // TODO doing this twice... also doing this above - const transactionRecord: TransactionRecord = { txSig: signature, slot: ctx.slot.toString(), - blockTime: new Date(tx.blockTime * 1000), // TODO need to verify if this is correct + blockTime: new Date(tx.blockTime * 1000), failed: tx.err !== undefined, payload: serialize(tx), serializerLogicVersion: SERIALIZED_TRANSACTION_LOGIC_VERSION, mainIxType: getMainIxTypeFromTransaction(tx), }; - // TODO: This needs smore work before it's ready - // const priceRecord: PricesRecord = { - // marketAcct: swapOrder.marketAcct, - // updatedSlot: ctx.slot.toString(), - // createdAt: transactionRecord.blockTime, - // // TODO: This doesn't have base and quote... So could be an issue.. - // price: swapTake.quotePrice, - // pricesType: PricesType.Conditional, - // } - - return Ok(new SwapPersistable(swapOrder, swapTake, transactionRecord)); // priceRecord + return Ok(new SwapPersistable(swapOrder, swapTake, transactionRecord)); } else { // handle non-swap transactions (add/remove liquidity, crank, etc) // find market account from instructions @@ -307,7 +295,6 @@ export class SwapBuilder { const userAcct = swapIx.accountsWithData.find((a) => a.name === "user"); if (!userAcct) return Err({ type: "missing data" }); - const userAcctPubKey = new PublicKey(userAcct.pubkey); // TODO fix const userBaseAcct = swapIx.accountsWithData.find( (a) => a.name === "userBaseAccount" diff --git a/packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts b/packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts index 36d537c..38271ad 100644 --- a/packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts +++ b/packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts @@ -121,10 +121,6 @@ export async function indexAmmMarketAccountWithContext( const proposalAcct = market[0].proposalAcct; - // if (proposalAcct === null) { - // logger.error("failed to index amm twap for v4 amm", account.toBase58()); - // return Err({ type: AmmMarketAccountIndexingErrors.AmmV4TwapIndexError }); - // } const twapNumber: string = twapCalculation.toString(); const newTwap: TwapRecord = { curTwap: twapNumber, @@ -152,7 +148,6 @@ export async function indexAmmMarketAccountWithContext( if (twapUpsertResult === undefined || twapUpsertResult.length === 0) { logger.error("failed to upsert twap", newTwap); - // return Err({ type: AmmMarketAccountIndexingErrors.AmmTwapNoInsertError }); } } catch (e) { logger.error("failed to upsert twap", e); diff --git a/packages/indexer/src/v3_indexer/indexers/autocrat/autocrat-dao-indexer.ts b/packages/indexer/src/v3_indexer/indexers/autocrat/autocrat-dao-indexer.ts index d7f11d3..aa4d1bf 100644 --- a/packages/indexer/src/v3_indexer/indexers/autocrat/autocrat-dao-indexer.ts +++ b/packages/indexer/src/v3_indexer/indexers/autocrat/autocrat-dao-indexer.ts @@ -51,10 +51,7 @@ export const AutocratDaoIndexer: IntervalFetchIndexer = { logger.error("Unable to determine public key for dao tokens"); return Err({ type: AutocratDaoIndexerError.MissingParamError }); } - // const baseTokenData = await enrichTokenMetadata( - // new PublicKey(dao.baseToken.publicKey), - // provider - // ); + const baseTokenMint = await getMint( connection, new PublicKey(dao.baseToken.publicKey) From 34b195a9242743529d5b458b1a31a350eb5bd9ab Mon Sep 17 00:00:00 2001 From: advaith101 Date: Tue, 19 Nov 2024 20:26:34 -0500 Subject: [PATCH 2/2] feat: index autocrat daos/proposals from logsSubscribe --- packages/indexer/src/subscriber.ts | 2 +- packages/indexer/src/v3_indexer/indexer.ts | 7 +- .../v3_indexer/indexers/amm-market/utils.ts | 2 +- .../autocrat/autocrat-proposal-indexer.ts | 102 ++++++++++++++++-- 4 files changed, 99 insertions(+), 14 deletions(-) diff --git a/packages/indexer/src/subscriber.ts b/packages/indexer/src/subscriber.ts index 50d805d..c347340 100644 --- a/packages/indexer/src/subscriber.ts +++ b/packages/indexer/src/subscriber.ts @@ -44,7 +44,7 @@ export async function subscribeAll() { V4_AUTOCRAT_PROGRAM_ID, V4_CONDITIONAL_VAULT_PROGRAM_ID, // V3_AMM_PROGRAM_ID, - // V3_AUTOCRAT_PROGRAM_ID, + V3_AUTOCRAT_PROGRAM_ID, // V3_CONDITIONAL_VAULT_PROGRAM_ID ]; console.log("Subscribing to logs"); diff --git a/packages/indexer/src/v3_indexer/indexer.ts b/packages/indexer/src/v3_indexer/indexer.ts index 2f58c37..e232d80 100644 --- a/packages/indexer/src/v3_indexer/indexer.ts +++ b/packages/indexer/src/v3_indexer/indexer.ts @@ -6,6 +6,7 @@ import { AutocratProposalIndexer } from "./indexers/autocrat/autocrat-proposal-i export async function indexFromLogs(logs: Logs, ctx: Context, programId: PublicKey) { + console.log("indexFromLogs:: indexing logs", logs); if (programId.equals(V3_AMM_PROGRAM_ID)) { await AmmMarketLogsSubscribeIndexer.index(logs, programId, ctx); } else if (programId.equals(V3_CONDITIONAL_VAULT_PROGRAM_ID)) { @@ -15,6 +16,7 @@ export async function indexFromLogs(logs: Logs, ctx: Context, programId: PublicK return; } else if (programId.equals(V3_AUTOCRAT_PROGRAM_ID)) { // return; + console.log("indexFromLogs:: autocrat logs received", logs); // Parse logs to find instruction type const instructionLog = logs.logs.find(log => log.includes("Instruction:") && @@ -26,12 +28,13 @@ export async function indexFromLogs(logs: Logs, ctx: Context, programId: PublicK ); if (instructionLog) { + //TODO: the autocrat proposal and dao indexer(s) doesnt take any args, but the interval indexer interface requires it. maybe fix that lol. if (instructionLog.includes("InitializeDao") || instructionLog.includes("UpdateDao")) { - await AutocratDaoIndexer.indexFromLogs(logs.logs); + await AutocratDaoIndexer.index(programId.toString()); } else if (instructionLog.includes("InitializeProposal") || instructionLog.includes("FinalizeProposal") || instructionLog.includes("ExecuteProposal")) { - await AutocratProposalIndexer.indexFromLogs(logs.logs); + await AutocratProposalIndexer.index(programId.toString()); } } } else { diff --git a/packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts b/packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts index 38271ad..b297fb9 100644 --- a/packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts +++ b/packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts @@ -150,7 +150,7 @@ export async function indexAmmMarketAccountWithContext( logger.error("failed to upsert twap", newTwap); } } catch (e) { - logger.error("failed to upsert twap", e); + logger.error("error upserting twap", e); return Err({ type: AmmMarketAccountIndexingErrors.AmmTwapNoInsertError }); } } diff --git a/packages/indexer/src/v3_indexer/indexers/autocrat/autocrat-proposal-indexer.ts b/packages/indexer/src/v3_indexer/indexers/autocrat/autocrat-proposal-indexer.ts index 8117987..430877d 100644 --- a/packages/indexer/src/v3_indexer/indexers/autocrat/autocrat-proposal-indexer.ts +++ b/packages/indexer/src/v3_indexer/indexers/autocrat/autocrat-proposal-indexer.ts @@ -61,6 +61,7 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = { cronExpression: "5 * * * * *", index: async () => { try { + console.log("AutocratProposalIndexer::index::starting"); const { currentSlot, currentTime } = ( await usingDb((db) => @@ -443,7 +444,9 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = { indexFromLogs: async (logs: string[]) => { try { - console.log("indexFromLogs::logs", logs); + + //TODO: leaving this here for now, maybe one day we will revisit and do it more efficiently. + console.log("AutocratProposalIndexer::indexFromLogs::logs", logs); // Find the relevant log that contains the proposal data const proposalLog = logs.find(log => log.includes("Instruction:") && @@ -451,18 +454,22 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = { log.includes("FinalizeProposal") || log.includes("ExecuteProposal")) ); + console.log("AutocratProposalIndexer::indexFromLogs::proposalLog", proposalLog); if (!proposalLog) { + console.log("AutocratProposalIndexer::indexFromLogs::proposalLog not found"); return Err({ type: AutocratDaoIndexerError.MissingParamError }); } // Extract proposal account from logs const proposalAcctMatch = logs.find(log => log.includes("Proposal:")); if (!proposalAcctMatch) { + console.log("AutocratProposalIndexer::indexFromLogs::proposalAcctMatch not found"); return Err({ type: AutocratDaoIndexerError.MissingParamError }); } const proposalAcct = new PublicKey(proposalAcctMatch.split(": ")[1]); + console.log("AutocratProposalIndexer::indexFromLogs::proposalAcct", proposalAcct); // Fetch the proposal data since we need the full account data const protocolV0_3 = rpcReadClient.futarchyProtocols.find( @@ -477,6 +484,7 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = { if (!proposal) { return Err({ type: AutocratDaoIndexerError.NotFoundError }); } + console.log("AutocratProposalIndexer::indexFromLogs::proposal", proposal); // Get current slot and time for calculations const { currentSlot, currentTime } = ( @@ -497,10 +505,22 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = { return Err({ type: AutocratDaoIndexerError.MissingParamError }); } + // If this is a new proposal, insert associated accounts data + if (proposalLog.includes("InitializeProposal")) { + console.log("indexFromLogs::inserting associated accounts data for proposal", proposalAcct); + await upsertProposal({ publicKey: proposalAcct, account: proposal }, currentTime); + await insertAssociatedAccountsDataForProposal( + { publicKey: proposalAcct, account: proposal }, + currentTime + ); + } + // Handle different proposal states if (proposal.state.pending) { // Update proposal as pending - await updateProposalStatus(proposalAcct, ProposalStatus.Pending, currentTime); + if (!proposalLog.includes("InitializeProposal")) { // If this is a new proposal, we dont need to update the status + await updateProposalStatus(proposalAcct, ProposalStatus.Pending, currentTime); + } } else if (proposal.state.passed) { // Update proposal as passed await updateProposalStatus(proposalAcct, ProposalStatus.Passed, currentTime); @@ -513,14 +533,7 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = { await calculateUserPerformance({ publicKey: proposalAcct, account: proposal }); } - // If this is a new proposal, insert associated accounts data - if (proposalLog.includes("InitializeProposal")) { - await insertAssociatedAccountsDataForProposal( - { publicKey: proposalAcct, account: proposal }, - currentTime - ); - } - + console.log("AutocratProposalIndexer::indexFromLogs::done"); return Ok({ acct: "Updated proposal from logs" }); } catch (err) { logger.error("error with proposal indexer:", err); @@ -529,6 +542,75 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = { } }; +// helper function to upsert proposal +async function upsertProposal(proposal: ProposalAccountWithKey, currentTime: Date) { + const daoAcct = proposal.account.dao; + if (!daoAcct) { + console.log("AutocratProposalIndexer::upsertProposal::daoAcct not found"); + return Err({ type: AutocratDaoIndexerError.MissingParamError }); + } + + // Get DAO details + const dbDao: DaoRecord | undefined = ( + await usingDb((db) => + db + .select() + .from(schema.daos) + .where(eq(schema.daos.daoAcct, daoAcct.toBase58())) + .execute() + ) + )?.[0]; + + if (!dbDao) return; + + // Calculate end slot + const initialSlot = proposal.account.slotEnqueued; + const endSlot = initialSlot.add(new BN(dbDao.slotsPerProposal?.toString())); + + // Prepare proposal record + const dbProposal: ProposalRecord = { + proposalAcct: proposal.publicKey.toString(), + proposalNum: BigInt(proposal.account.number.toString()), + autocratVersion: 0.3, + daoAcct: daoAcct.toString(), + proposerAcct: proposal.account.proposer.toString(), + status: ProposalStatus.Pending, + descriptionURL: proposal.account.descriptionUrl, + initialSlot: initialSlot.toString(), + passMarketAcct: proposal.account.passAmm?.toString() ?? null, + failMarketAcct: proposal.account.failAmm?.toString() ?? null, + baseVault: proposal.account.baseVault.toString(), + quoteVault: proposal.account.quoteVault.toString(), + endSlot: endSlot.toString(), + durationInSlots: dbDao.slotsPerProposal, + minBaseFutarchicLiquidity: dbDao.minBaseFutarchicLiquidity ?? null, + minQuoteFutarchicLiquidity: dbDao.minQuoteFutarchicLiquidity ?? null, + passThresholdBps: dbDao.passThresholdBps, + twapInitialObservation: dbDao.twapInitialObservation ?? null, + twapMaxObservationChangePerUpdate: dbDao.twapMaxObservationChangePerUpdate ?? null, + }; + + // Insert or update the proposal + await usingDb((db) => + db + .insert(schema.proposals) + .values([dbProposal]) + .onConflictDoUpdate({ + target: [schema.proposals.proposalAcct], + set: { + status: dbProposal.status, + descriptionURL: dbProposal.descriptionURL, + initialSlot: dbProposal.initialSlot, + endSlot: dbProposal.endSlot, + updatedAt: sql`NOW()`, + }, + }) + .execute() + ); + + return Ok({ acct: "Proposal upserted successfully" }); +} + // Helper function to update proposal status async function updateProposalStatus( proposalAcct: PublicKey,