Skip to content

Commit

Permalink
Merge pull request #323 from metaDAOproject/feat/performance
Browse files Browse the repository at this point in the history
Feat/performance - index autocrat from logsSubscribe
  • Loading branch information
R-K-H authored Nov 20, 2024
2 parents 67cd47f + 34b195a commit 2d0be15
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 39 deletions.
2 changes: 1 addition & 1 deletion packages/indexer/src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export async function subscribeAll() {
V4_AUTOCRAT_PROGRAM_ID,
V4_CONDITIONAL_VAULT_PROGRAM_ID,
// V3_AMM_PROGRAM_ID,
// V3_AUTOCRAT_PROGRAM_ID,
V3_AUTOCRAT_PROGRAM_ID,
// V3_CONDITIONAL_VAULT_PROGRAM_ID
];
console.log("Subscribing to logs");
Expand Down
19 changes: 3 additions & 16 deletions packages/indexer/src/v3_indexer/builders/swaps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,36 +200,24 @@ export class SwapBuilder {
return Err({ type: SwapPersistableError.ArbTransactionError });
}

// now we are upserting price/twap in the buildOrderFromSwapIx function
const result = await this.buildOrderFromSwapIx(swapIx, tx, mintIx);
if (!result.success) {
return Err(result.error);
}
const { swapOrder, swapTake } = result.ok;

// TODO: consider co-locating this logic so it can be shared
// TODO doing this twice... also doing this above

const transactionRecord: TransactionRecord = {
txSig: signature,
slot: ctx.slot.toString(),
blockTime: new Date(tx.blockTime * 1000), // TODO need to verify if this is correct
blockTime: new Date(tx.blockTime * 1000),
failed: tx.err !== undefined,
payload: serialize(tx),
serializerLogicVersion: SERIALIZED_TRANSACTION_LOGIC_VERSION,
mainIxType: getMainIxTypeFromTransaction(tx),
};

// TODO: This needs smore work before it's ready
// const priceRecord: PricesRecord = {
// marketAcct: swapOrder.marketAcct,
// updatedSlot: ctx.slot.toString(),
// createdAt: transactionRecord.blockTime,
// // TODO: This doesn't have base and quote... So could be an issue..
// price: swapTake.quotePrice,
// pricesType: PricesType.Conditional,
// }

return Ok(new SwapPersistable(swapOrder, swapTake, transactionRecord)); // priceRecord
return Ok(new SwapPersistable(swapOrder, swapTake, transactionRecord));
} else {
// handle non-swap transactions (add/remove liquidity, crank, etc)
// find market account from instructions
Expand Down Expand Up @@ -307,7 +295,6 @@ export class SwapBuilder {

const userAcct = swapIx.accountsWithData.find((a) => a.name === "user");
if (!userAcct) return Err({ type: "missing data" });
const userAcctPubKey = new PublicKey(userAcct.pubkey);
// TODO fix
const userBaseAcct = swapIx.accountsWithData.find(
(a) => a.name === "userBaseAccount"
Expand Down
7 changes: 5 additions & 2 deletions packages/indexer/src/v3_indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AutocratProposalIndexer } from "./indexers/autocrat/autocrat-proposal-i


export async function indexFromLogs(logs: Logs, ctx: Context, programId: PublicKey) {
console.log("indexFromLogs:: indexing logs", logs);
if (programId.equals(V3_AMM_PROGRAM_ID)) {
await AmmMarketLogsSubscribeIndexer.index(logs, programId, ctx);
} else if (programId.equals(V3_CONDITIONAL_VAULT_PROGRAM_ID)) {
Expand All @@ -15,6 +16,7 @@ export async function indexFromLogs(logs: Logs, ctx: Context, programId: PublicK
return;
} else if (programId.equals(V3_AUTOCRAT_PROGRAM_ID)) {
// return;
console.log("indexFromLogs:: autocrat logs received", logs);
// Parse logs to find instruction type
const instructionLog = logs.logs.find(log =>
log.includes("Instruction:") &&
Expand All @@ -26,12 +28,13 @@ export async function indexFromLogs(logs: Logs, ctx: Context, programId: PublicK
);

if (instructionLog) {
//TODO: the autocrat proposal and dao indexer(s) doesnt take any args, but the interval indexer interface requires it. maybe fix that lol.
if (instructionLog.includes("InitializeDao") || instructionLog.includes("UpdateDao")) {
await AutocratDaoIndexer.indexFromLogs(logs.logs);
await AutocratDaoIndexer.index(programId.toString());
} else if (instructionLog.includes("InitializeProposal") ||
instructionLog.includes("FinalizeProposal") ||
instructionLog.includes("ExecuteProposal")) {
await AutocratProposalIndexer.indexFromLogs(logs.logs);
await AutocratProposalIndexer.index(programId.toString());
}
}
} else {
Expand Down
7 changes: 1 addition & 6 deletions packages/indexer/src/v3_indexer/indexers/amm-market/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ export async function indexAmmMarketAccountWithContext(

const proposalAcct = market[0].proposalAcct;

// if (proposalAcct === null) {
// logger.error("failed to index amm twap for v4 amm", account.toBase58());
// return Err({ type: AmmMarketAccountIndexingErrors.AmmV4TwapIndexError });
// }
const twapNumber: string = twapCalculation.toString();
const newTwap: TwapRecord = {
curTwap: twapNumber,
Expand Down Expand Up @@ -152,10 +148,9 @@ export async function indexAmmMarketAccountWithContext(

if (twapUpsertResult === undefined || twapUpsertResult.length === 0) {
logger.error("failed to upsert twap", newTwap);
// return Err({ type: AmmMarketAccountIndexingErrors.AmmTwapNoInsertError });
}
} catch (e) {
logger.error("failed to upsert twap", e);
logger.error("error upserting twap", e);
return Err({ type: AmmMarketAccountIndexingErrors.AmmTwapNoInsertError });
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ export const AutocratDaoIndexer: IntervalFetchIndexer = {
logger.error("Unable to determine public key for dao tokens");
return Err({ type: AutocratDaoIndexerError.MissingParamError });
}
// const baseTokenData = await enrichTokenMetadata(
// new PublicKey(dao.baseToken.publicKey),
// provider
// );

const baseTokenMint = await getMint(
connection,
new PublicKey(dao.baseToken.publicKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = {
cronExpression: "5 * * * * *",
index: async () => {
try {
console.log("AutocratProposalIndexer::index::starting");
const { currentSlot, currentTime } =
(
await usingDb((db) =>
Expand Down Expand Up @@ -443,26 +444,32 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = {

indexFromLogs: async (logs: string[]) => {
try {
console.log("indexFromLogs::logs", logs);

//TODO: leaving this here for now, maybe one day we will revisit and do it more efficiently.
console.log("AutocratProposalIndexer::indexFromLogs::logs", logs);
// Find the relevant log that contains the proposal data
const proposalLog = logs.find(log =>
log.includes("Instruction:") &&
(log.includes("InitializeProposal") ||
log.includes("FinalizeProposal") ||
log.includes("ExecuteProposal"))
);
console.log("AutocratProposalIndexer::indexFromLogs::proposalLog", proposalLog);

if (!proposalLog) {
console.log("AutocratProposalIndexer::indexFromLogs::proposalLog not found");
return Err({ type: AutocratDaoIndexerError.MissingParamError });
}

// Extract proposal account from logs
const proposalAcctMatch = logs.find(log => log.includes("Proposal:"));
if (!proposalAcctMatch) {
console.log("AutocratProposalIndexer::indexFromLogs::proposalAcctMatch not found");
return Err({ type: AutocratDaoIndexerError.MissingParamError });
}

const proposalAcct = new PublicKey(proposalAcctMatch.split(": ")[1]);
console.log("AutocratProposalIndexer::indexFromLogs::proposalAcct", proposalAcct);

// Fetch the proposal data since we need the full account data
const protocolV0_3 = rpcReadClient.futarchyProtocols.find(
Expand All @@ -477,6 +484,7 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = {
if (!proposal) {
return Err({ type: AutocratDaoIndexerError.NotFoundError });
}
console.log("AutocratProposalIndexer::indexFromLogs::proposal", proposal);

// Get current slot and time for calculations
const { currentSlot, currentTime } = (
Expand All @@ -497,10 +505,22 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = {
return Err({ type: AutocratDaoIndexerError.MissingParamError });
}

// If this is a new proposal, insert associated accounts data
if (proposalLog.includes("InitializeProposal")) {
console.log("indexFromLogs::inserting associated accounts data for proposal", proposalAcct);
await upsertProposal({ publicKey: proposalAcct, account: proposal }, currentTime);
await insertAssociatedAccountsDataForProposal(
{ publicKey: proposalAcct, account: proposal },
currentTime
);
}

// Handle different proposal states
if (proposal.state.pending) {
// Update proposal as pending
await updateProposalStatus(proposalAcct, ProposalStatus.Pending, currentTime);
if (!proposalLog.includes("InitializeProposal")) { // If this is a new proposal, we dont need to update the status
await updateProposalStatus(proposalAcct, ProposalStatus.Pending, currentTime);
}
} else if (proposal.state.passed) {
// Update proposal as passed
await updateProposalStatus(proposalAcct, ProposalStatus.Passed, currentTime);
Expand All @@ -513,14 +533,7 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = {
await calculateUserPerformance({ publicKey: proposalAcct, account: proposal });
}

// If this is a new proposal, insert associated accounts data
if (proposalLog.includes("InitializeProposal")) {
await insertAssociatedAccountsDataForProposal(
{ publicKey: proposalAcct, account: proposal },
currentTime
);
}

console.log("AutocratProposalIndexer::indexFromLogs::done");
return Ok({ acct: "Updated proposal from logs" });
} catch (err) {
logger.error("error with proposal indexer:", err);
Expand All @@ -529,6 +542,75 @@ export const AutocratProposalIndexer: IntervalFetchIndexer = {
}
};

// helper function to upsert proposal
async function upsertProposal(proposal: ProposalAccountWithKey, currentTime: Date) {
const daoAcct = proposal.account.dao;
if (!daoAcct) {
console.log("AutocratProposalIndexer::upsertProposal::daoAcct not found");
return Err({ type: AutocratDaoIndexerError.MissingParamError });
}

// Get DAO details
const dbDao: DaoRecord | undefined = (
await usingDb((db) =>
db
.select()
.from(schema.daos)
.where(eq(schema.daos.daoAcct, daoAcct.toBase58()))
.execute()
)
)?.[0];

if (!dbDao) return;

// Calculate end slot
const initialSlot = proposal.account.slotEnqueued;
const endSlot = initialSlot.add(new BN(dbDao.slotsPerProposal?.toString()));

// Prepare proposal record
const dbProposal: ProposalRecord = {
proposalAcct: proposal.publicKey.toString(),
proposalNum: BigInt(proposal.account.number.toString()),
autocratVersion: 0.3,
daoAcct: daoAcct.toString(),
proposerAcct: proposal.account.proposer.toString(),
status: ProposalStatus.Pending,
descriptionURL: proposal.account.descriptionUrl,
initialSlot: initialSlot.toString(),
passMarketAcct: proposal.account.passAmm?.toString() ?? null,
failMarketAcct: proposal.account.failAmm?.toString() ?? null,
baseVault: proposal.account.baseVault.toString(),
quoteVault: proposal.account.quoteVault.toString(),
endSlot: endSlot.toString(),
durationInSlots: dbDao.slotsPerProposal,
minBaseFutarchicLiquidity: dbDao.minBaseFutarchicLiquidity ?? null,
minQuoteFutarchicLiquidity: dbDao.minQuoteFutarchicLiquidity ?? null,
passThresholdBps: dbDao.passThresholdBps,
twapInitialObservation: dbDao.twapInitialObservation ?? null,
twapMaxObservationChangePerUpdate: dbDao.twapMaxObservationChangePerUpdate ?? null,
};

// Insert or update the proposal
await usingDb((db) =>
db
.insert(schema.proposals)
.values([dbProposal])
.onConflictDoUpdate({
target: [schema.proposals.proposalAcct],
set: {
status: dbProposal.status,
descriptionURL: dbProposal.descriptionURL,
initialSlot: dbProposal.initialSlot,
endSlot: dbProposal.endSlot,
updatedAt: sql`NOW()`,
},
})
.execute()
);

return Ok({ acct: "Proposal upserted successfully" });
}

// Helper function to update proposal status
async function updateProposalStatus(
proposalAcct: PublicKey,
Expand Down

0 comments on commit 2d0be15

Please sign in to comment.