Skip to content

Commit

Permalink
Handle subgraph template create when filterLogsByAddresses set to t…
Browse files Browse the repository at this point in the history
…rue (#447)

* Handle subgraph template create when filterLogsByAddresses set to true

* Fix historical processing stopping after running for multiple batches

* Add new method in graph-node test dummy indexer
  • Loading branch information
nikugogoi authored Nov 3, 2023
1 parent 7f37dac commit 0d7e3dd
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 40 deletions.
4 changes: 2 additions & 2 deletions packages/codegen/src/templates/config-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
filterLogsByTopics = false

# Boolean to switch between modes of processing events when starting the server.
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them
# Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head)
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
# Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head).
useBlockRanges = true

# Max block range for which to return events in eventsInRange GQL query.
Expand Down
8 changes: 8 additions & 0 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ export class Indexer implements IndexerInterface {
return [];
}

async fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[]): Promise<DeepPartial<EventInterface>[]> {
assert(blockHash);
assert(blockNumber);
assert(addresses);

return [];
}

async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
return [block, []];
}
Expand Down
72 changes: 52 additions & 20 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,19 @@ export const _fetchBatchBlocks = async (
* @param block
* @param eventsInBatch
*/
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise<void> => {
let dbBlock: BlockProgressInterface, dbEvents: EventInterface[];
export const processBatchEvents = async (
indexer: IndexerInterface,
block: BlockProgressInterface,
eventsInBatch: number,
subgraphEventsOrder: boolean
): Promise<boolean> => {
let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[];
let isNewContractWatched = false;

if (subgraphEventsOrder) {
({ dbBlock, dbEvents } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
} else {
({ dbBlock, dbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
({ dbBlock, updatedDbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
}

if (indexer.processBlockAfterEvents) {
Expand All @@ -300,13 +307,15 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
console.time('time:common#processBatchEvents-updateBlockProgress-saveEvents');
await Promise.all([
indexer.updateBlockProgress(dbBlock, dbBlock.lastProcessedEventIndex),
indexer.saveEvents(dbEvents)
indexer.saveEvents(updatedDbEvents)
]);
console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents');

return isNewContractWatched;
};

const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const dbEvents: EventInterface[] = [];
const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[] }> => {
const updatedDbEvents: EventInterface[] = [];

let page = 0;
let numFetchedEvents = 0;
Expand Down Expand Up @@ -344,7 +353,7 @@ const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInt
if (event.eventName === UNKNOWN_EVENT_NAME) {
// Parse the unknown event and save updated event to the db
event = _parseUnknownEvent(indexer, event, watchedContract.kind);
dbEvents.push(event);
updatedDbEvents.push(event);
}

await indexer.processEvent(event);
Expand All @@ -357,15 +366,18 @@ const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInt
console.timeEnd('time:common#processEvents-processing_events_batch');
}

return { dbBlock: block, dbEvents };
// TODO: Fetch and reprocess events if filterByAddresses true and new contracts found

return { dbBlock: block, updatedDbEvents: updatedDbEvents };
};

const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[], isNewContractWatched: boolean }> => {
// Create list of initially watched contracts
const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address);
const unwatchedContractEvents: EventInterface[] = [];
let isNewContractWatched = false;

const dbEvents: EventInterface[] = [];
const updatedDbEvents: EventInterface[] = [];

let page = 0;
let numFetchedEvents = 0;
Expand Down Expand Up @@ -408,9 +420,26 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
console.timeEnd('time:common#processEventsInSubgraphOrder-processing_events_batch');
}

console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events');
const watchedContracts = indexer.getWatchedContracts().map(contract => contract.address);

// Check if there are new watched contracts
if (watchedContracts.length > initiallyWatchedContracts.length) {
isNewContractWatched = true;

// Check if filterLogsByAddresses is set to true
if (indexer.serverConfig.filterLogsByAddresses) {
// Fetch and parse events for newly watched contracts
const newContracts = watchedContracts.filter(contract => !initiallyWatchedContracts.includes(contract));
const events = await indexer.fetchEventsForContracts(block.blockHash, block.blockNumber, newContracts);

events.forEach(event => {
event.block = block;
updatedDbEvents.push(event as EventInterface);
});
}
}

// At last, process all the events of newly watched contracts
// Parse events of initially unwatched contracts
for (let event of unwatchedContractEvents) {
const watchedContract = indexer.isWatchedContract(event.contract);

Expand All @@ -420,19 +449,22 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
if (event.eventName === UNKNOWN_EVENT_NAME) {
// Parse the unknown event and save updated event to the db
event = _parseUnknownEvent(indexer, event, watchedContract.kind);
dbEvents.push(event);
updatedDbEvents.push(event);
}

await indexer.processEvent(event);
}
}

console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');
// In the end process events of newly watched contracts
for (const updatedDbEvent of updatedDbEvents) {
await indexer.processEvent(updatedDbEvent);

block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, event.index);
block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, updatedDbEvent.index);
block.numProcessedEvents++;
}
console.timeEnd('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');

console.timeEnd('time:common#processEventsInSubgraphOrder-processing_unwatched_events');

return { dbBlock: block, dbEvents };
return { dbBlock: block, updatedDbEvents: updatedDbEvents, isNewContractWatched };
};

const _getEventsBatch = async (indexer: IndexerInterface, blockHash: string, eventsInBatch: number, page: number): Promise<EventInterface[]> => {
Expand Down
4 changes: 2 additions & 2 deletions packages/util/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ export interface ServerConfig {
clearEntitiesCacheInterval: number;

// Boolean to switch between modes of processing events when starting the server.
// Setting to true will fetch filtered events and required blocks in a range of blocks and then process them
// Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head)
// Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
// Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head).
useBlockRanges: boolean;

// Boolean to skip updating entity fields required in state creation and not required in the frontend.
Expand Down
30 changes: 22 additions & 8 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import { ServerConfig } from './config';

const EVENT = 'event';

// TODO: Make configurable
const HISTORICAL_MAX_FETCH_AHEAD = 20_000;

const log = debug('vulcanize:events');

export const BlockProgressEvent = 'block-progress-event';
Expand Down Expand Up @@ -94,7 +97,13 @@ export class EventWatcher {
// Check if filter for logs is enabled
// Check if starting block for watcher is before latest canonical block
if (this._serverConfig.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
await this.startHistoricalBlockProcessing(startBlockNumber, latestCanonicalBlockNumber);
let endBlockNumber = latestCanonicalBlockNumber;

if (HISTORICAL_MAX_FETCH_AHEAD > 0) {
endBlockNumber = Math.min(startBlockNumber + HISTORICAL_MAX_FETCH_AHEAD, endBlockNumber);
}

await this.startHistoricalBlockProcessing(startBlockNumber, endBlockNumber);

return;
}
Expand All @@ -103,6 +112,8 @@ export class EventWatcher {
}

async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
// TODO: Wait for events job queue to be empty so that historical processing does not move far ahead

this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing up to block ${this._historicalProcessingEndBlockNumber}`);

Expand Down Expand Up @@ -184,15 +195,15 @@ export class EventWatcher {

async historicalProcessingCompleteHandler (job: PgBoss.Job<any>): Promise<void> {
const { id, data: { failed, request: { data } } } = job;
const { blockNumber, processingEndBlockNumber }: HistoricalJobData = data;
const { blockNumber, isComplete }: HistoricalJobData = data;

if (failed) {
if (failed || isComplete) {
log(`Job ${id} for queue ${QUEUE_HISTORICAL_PROCESSING} failed`);
return;
}

// TODO: Get batch size from config
const batchEndBlockNumber = Math.min(blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE, processingEndBlockNumber);
const batchEndBlockNumber = Math.min(blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE, this._historicalProcessingEndBlockNumber);
const nextBatchStartBlockNumber = batchEndBlockNumber + 1;
log(`Historical block processing completed for block range: ${blockNumber} to ${batchEndBlockNumber}`);

Expand All @@ -201,12 +212,15 @@ export class EventWatcher {
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });
const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero;

// Update sync status to max of latest processed block or latest canonical block
const syncStatus = await this._indexer.forceUpdateSyncStatus(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber);
// Update sync status chain head and canonical block to end block of historical processing
const [syncStatus] = await Promise.all([
this._indexer.updateSyncStatusCanonicalBlock(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true),
this._indexer.updateSyncStatusChainHead(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true)
]);
log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`);

// Start realtime processing
this.startBlockProcessing();

return;
}

Expand All @@ -215,7 +229,7 @@ export class EventWatcher {
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: nextBatchStartBlockNumber,
processingEndBlockNumber
processingEndBlockNumber: this._historicalProcessingEndBlockNumber
}
);
}
Expand Down
16 changes: 14 additions & 2 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,19 @@ export class Indexer {
// Fetch events (to be saved to db) for a particular block
async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics);

return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
}

async fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
const { topics } = this._createLogsFilters(eventSignaturesMap);
const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics);

return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
}

async _fetchLogsAndTransactions (blockHash: string, blockNumber: number, addresses?: string[], topics?: string[][]): Promise<{ logs: any[]; transactions: any[] }> {
const logsPromise = await this._ethClient.getLogs({
blockHash,
blockNumber: blockNumber.toString(),
Expand All @@ -490,7 +502,7 @@ export class Indexer {
}
] = await Promise.all([logsPromise, transactionsPromise]);

return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
return { logs, transactions };
}

// Create events to be saved to db for a block given blockHash, logs, transactions and a parser function
Expand Down Expand Up @@ -682,7 +694,7 @@ export class Indexer {
return res;
}

async saveEvents (dbEvents: EventInterface[]): Promise<void> {
async saveEvents (dbEvents: DeepPartial<EventInterface>[]): Promise<void> {
const dbTx = await this._db.createTransactionRunner();

try {
Expand Down
4 changes: 2 additions & 2 deletions packages/util/src/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ export class JobQueue {
);
}

async markComplete (job: PgBoss.Job): Promise<void> {
this._boss.complete(job.id);
async markComplete (job: PgBoss.Job, data: object = {}): Promise<void> {
this._boss.complete(job.id, { ...job.data, ...data });
}

async pushJob (queue: string, job: any, options: PgBoss.PublishOptions = {}): Promise<void> {
Expand Down
Loading

0 comments on commit 0d7e3dd

Please sign in to comment.