From f962d6f9bdfd2dee140c41e5a3448da2e786194b Mon Sep 17 00:00:00 2001 From: Jonathan LEI Date: Thu, 25 Apr 2024 13:10:59 +0800 Subject: [PATCH] feat: support pending blocks Since the Firehose protocol does not natively support the notion of a "pending" block, we have to implement them as confirmed blocks with pseudo block hashes, which are formatted in such a way that they encode information needed for comparing between pending blocks. --- src/main.rs | 282 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 230 insertions(+), 52 deletions(-) diff --git a/src/main.rs b/src/main.rs index e25f34b..2467024 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io::Write, path::PathBuf, time::Duration}; +use std::{collections::HashSet, fs::File, io::Write, path::PathBuf, time::Duration}; use anyhow::Result; use clap::Parser; @@ -8,8 +8,8 @@ use starknet::{ core::{ serde::unsigned_field_element::UfeHex, types::{ - BlockId, BlockWithTxs, EmittedEvent, EventFilter, FieldElement, - MaybePendingBlockWithTxs, StarknetError, Transaction, + BlockId, BlockTag, BlockWithTxs, EmittedEvent, EventFilter, FieldElement, + MaybePendingBlockWithTxs, PendingBlockWithTxs, StarknetError, Transaction, }, }, providers::{ @@ -47,6 +47,88 @@ struct BlockInfo { hash: FieldElement, } +enum MaybePendingBlock { + Confirmed(BlockWithTxs), + Pending { + pseudo_height: u64, + block: PendingBlockWithTxs, + }, +} + +struct PendingBlockInfo { + tx_count: usize, +} + +impl MaybePendingBlock { + pub fn parent_hash(&self) -> FieldElement { + match self { + Self::Confirmed(block) => block.parent_hash, + Self::Pending { block, .. } => block.parent_hash, + } + } + + pub fn timestamp(&self) -> u64 { + match self { + Self::Confirmed(block) => block.timestamp, + Self::Pending { block, .. } => block.timestamp, + } + } + + pub fn height(&self) -> u64 { + match self { + Self::Confirmed(block) => block.block_number, + Self::Pending { pseudo_height, .. } => *pseudo_height, + } + } + + pub fn transactions(&self) -> &[Transaction] { + match self { + Self::Confirmed(block) => &block.transactions, + Self::Pending { block, .. } => &block.transactions, + } + } + + pub fn block_hash(&self) -> FieldElement { + match self { + Self::Confirmed(block) => block.block_hash, + Self::Pending { + pseudo_height, + block, + } => { + // +-------+-----------+--------------+---------------------+------------------+ + // | Fixed | "PENDING" | Block Number | Transaction Count | Reserved | + // | 1 byte| 7 bytes | 8 bytes | 4 bytes | 12 bytes | + // +-------+-----------+--------------+---------------------+------------------+ + // | 00 | 50 | NN | NN | 00 | + // | | 45 | NN | NN | 00 | + // | | 4E | NN | NN | 00 | + // | | 44 | NN | NN | 00 | + // | | 49 | NN | | 00 | + // | | 4E | NN | | 00 | + // | | 47 | NN | | 00 | + // | | | NN | | 00 | + // +-------+-----------+--------------+---------------------+------------------+ + + let mut buffer = [0u8; 32]; + buffer[1] = b'P'; + buffer[2] = b'E'; + buffer[3] = b'N'; + buffer[4] = b'D'; + buffer[5] = b'I'; + buffer[6] = b'N'; + buffer[7] = b'G'; + + buffer[8..(8 + 8)].copy_from_slice(&u64::to_be_bytes(*pseudo_height)); + buffer[(8 + 8)..(8 + 8 + 4)] + .copy_from_slice(&u32::to_be_bytes(block.transactions.len() as u32)); + + // This cannot fail as the buffer is always in range + FieldElement::from_bytes_be(&buffer).unwrap() + } + } + } +} + #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); @@ -76,6 +158,8 @@ async fn run( checkpoint: &mut Checkpoint, checkpoint_path: &PathBuf, ) { + let mut last_pending_block: Option = None; + loop { let (expected_parent_hash, current_block_number) = match checkpoint.latest_blocks.last() { Some(last_block) => (last_block.hash, last_block.number + 1), @@ -86,29 +170,68 @@ async fn run( .get_block_with_txs(BlockId::Number(current_block_number)) .await { - Ok(block) => match block { - MaybePendingBlockWithTxs::Block(block) => block, - MaybePendingBlockWithTxs::PendingBlock(_) => { - eprintln!("Unexpected pending block"); - tokio::time::sleep(FAILURE_BACKOFF).await; - continue; - } - }, + Ok(MaybePendingBlockWithTxs::Block(block)) => { + // Got a confirmed block. Clear the pending info now + last_pending_block = None; + + MaybePendingBlock::Confirmed(block) + } + Ok(MaybePendingBlockWithTxs::PendingBlock(_)) => { + eprintln!("Unexpected pending block"); + tokio::time::sleep(FAILURE_BACKOFF).await; + continue; + } Err(starknet::providers::ProviderError::StarknetError( StarknetError::BlockNotFound, )) => { - // No new block. Wait a bit before trying again - tokio::time::sleep(HEAD_BACKOFF).await; - continue; + // No new block. Try the pending block now + match jsonrpc_client + .get_block_with_txs(BlockId::Tag(BlockTag::Pending)) + .await + { + Ok(MaybePendingBlockWithTxs::Block(_)) => { + eprintln!("Unexpected confirmed block"); + tokio::time::sleep(FAILURE_BACKOFF).await; + continue; + } + Ok(MaybePendingBlockWithTxs::PendingBlock(block)) => { + // Unlike with confirmed blocks, we simply discard non-linkable pending + // blocks + if block.parent_hash != expected_parent_hash { + eprintln!("Found unlinkable pending block"); + tokio::time::sleep(HEAD_BACKOFF).await; + continue; + } + + if let Some(last_pending_block) = last_pending_block.as_ref() { + if last_pending_block.tx_count >= block.transactions.len() { + eprintln!("No newer pending block found"); + tokio::time::sleep(HEAD_BACKOFF).await; + continue; + } + } + + MaybePendingBlock::Pending { + pseudo_height: current_block_number, + block, + } + } + Err(err) => { + eprintln!("Error downloading pending block: {err}"); + tokio::time::sleep(HEAD_BACKOFF).await; + continue; + } + } } Err(err) => { - eprintln!("Error downloading block: {err}"); + eprintln!("Error downloading confirmed block: {err}"); tokio::time::sleep(FAILURE_BACKOFF).await; continue; } }; - if current_block.parent_hash != expected_parent_hash { + // This can only happen with confirmed blocks as we discard non-linkable pending blocks + if current_block.parent_hash() != expected_parent_hash { if checkpoint.latest_blocks.len() == 1 { // Not possible to determine common ancestor panic!("Unable to handle reorg after consuming the whole history"); @@ -124,25 +247,49 @@ async fn run( continue; } - println!( - "Processed block #{} ({:#064x})", - current_block.block_number, current_block.block_hash - ); + // Progress log + match ¤t_block { + MaybePendingBlock::Confirmed(block) => { + println!( + "Processed block #{} ({:#064x})", + block.block_number, block.block_hash + ); + } + MaybePendingBlock::Pending { + pseudo_height, + block, + } => { + println!( + "Processed pending block with pseudo height #{} ({} transactions)", + pseudo_height, + block.transactions.len() + ); + } + } - let new_block_info = BlockInfo { - number: current_block.block_number, - hash: current_block.block_hash, - }; + match ¤t_block { + MaybePendingBlock::Confirmed(block) => { + let new_block_info = BlockInfo { + number: block.block_number, + hash: block.block_hash, + }; - if checkpoint.latest_blocks.len() >= MAX_BLOCK_HISTORY { - // TODO: use ring buffer instead as this is extremely inefficient - checkpoint.latest_blocks.remove(0); - } - checkpoint.latest_blocks.push(new_block_info); + if checkpoint.latest_blocks.len() >= MAX_BLOCK_HISTORY { + // TODO: use ring buffer instead as this is extremely inefficient + checkpoint.latest_blocks.remove(0); + } + checkpoint.latest_blocks.push(new_block_info); - if current_block.block_number % CHECKPOINT_BLOCK_INTERVAL == 0 { - if let Err(err) = try_persist_checkpoint(checkpoint_path, checkpoint) { - eprintln!("Error persisting checkpoint: {err}"); + if block.block_number % CHECKPOINT_BLOCK_INTERVAL == 0 { + if let Err(err) = try_persist_checkpoint(checkpoint_path, checkpoint) { + eprintln!("Error persisting checkpoint: {err}"); + } + } + } + MaybePendingBlock::Pending { block, .. } => { + last_pending_block = Some(PendingBlockInfo { + tx_count: block.transactions.len(), + }) } } } @@ -150,15 +297,39 @@ async fn run( async fn handle_block( jsonrpc_client: &JsonRpcClient, - block: &BlockWithTxs, + block: &MaybePendingBlock, ) -> Result<()> { let mut buffer = Vec::new(); - writeln!(&mut buffer, "FIRE BLOCK_BEGIN {}", block.block_number)?; + writeln!(&mut buffer, "FIRE BLOCK_BEGIN {}", block.height())?; - let block_events = get_block_events(jsonrpc_client, block.block_hash).await?; + let block_events = get_block_events(jsonrpc_client, block).await?; - for tx in block.transactions.iter() { + // When using pending blocks, there's a race condition where a new block is confirmed before + // the `starknet_getEvents` request. We discard the block when: + // + // 1. Event list is empty when tx list is not; or + // 2. Any event refers to a tx that's not in the block. + if let MaybePendingBlock::Pending { block, .. } = block { + if block_events.is_empty() && !block.transactions.is_empty() { + anyhow::bail!("inconsistent pending block events: empty event list"); + } + + let block_tx_set = block + .transactions + .iter() + .map(|tx| *tx.transaction_hash()) + .collect::>(); + + if block_events + .iter() + .any(|event| !block_tx_set.contains(&event.transaction_hash)) + { + anyhow::bail!("inconsistent pending block events: invalid transaction reference"); + } + } + + for tx in block.transactions().iter() { let (tx_hash, type_str) = match tx { Transaction::Declare(tx) => (*tx.transaction_hash(), "DECLARE"), Transaction::Deploy(tx) => (tx.transaction_hash, "DEPLOY"), @@ -201,11 +372,11 @@ async fn handle_block( writeln!( &mut buffer, "FIRE BLOCK_END {} {:#064x} {:#064x} {} {}", - block.block_number, - block.block_hash, - block.parent_hash, - block.timestamp, - block.transactions.len(), + block.height(), + block.block_hash(), + block.parent_hash(), + block.timestamp(), + block.transactions().len(), )?; print!("{}", String::from_utf8(buffer)?); @@ -213,27 +384,34 @@ async fn handle_block( Ok(()) } +// Use `get_block_with_receipts` once we move to RPC v0.7.x async fn get_block_events( jsonrpc_client: &JsonRpcClient, - block_hash: FieldElement, + block: &MaybePendingBlock, ) -> Result> { const CHUNK_SIZE: u64 = 1000; let mut token = None; let mut events = vec![]; + let event_filter = match block { + MaybePendingBlock::Confirmed(block) => EventFilter { + from_block: Some(BlockId::Hash(block.block_hash)), + to_block: Some(BlockId::Hash(block.block_hash)), + address: None, + keys: None, + }, + MaybePendingBlock::Pending { .. } => EventFilter { + from_block: Some(BlockId::Tag(BlockTag::Pending)), + to_block: Some(BlockId::Tag(BlockTag::Pending)), + address: None, + keys: None, + }, + }; + loop { let mut result = jsonrpc_client - .get_events( - EventFilter { - from_block: Some(BlockId::Hash(block_hash)), - to_block: Some(BlockId::Hash(block_hash)), - address: None, - keys: None, - }, - token, - CHUNK_SIZE, - ) + .get_events(event_filter.clone(), token, CHUNK_SIZE) .await?; events.append(&mut result.events);