Skip to content

Commit

Permalink
feat: support pending blocks
Browse files Browse the repository at this point in the history
Since the Firehose protocol does not natively support the notion of
a "pending" block, we have to implement them as confirmed blocks with
pseudo block hashes, which are formatted in such a way that they encode
information needed for comparing between pending blocks.
  • Loading branch information
xJonathanLEI committed Apr 25, 2024
1 parent ec763a2 commit f962d6f
Showing 1 changed file with 230 additions and 52 deletions.
282 changes: 230 additions & 52 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fs::File, io::Write, path::PathBuf, time::Duration};
use std::{collections::HashSet, fs::File, io::Write, path::PathBuf, time::Duration};

use anyhow::Result;
use clap::Parser;
Expand All @@ -8,8 +8,8 @@ use starknet::{
core::{
serde::unsigned_field_element::UfeHex,
types::{
BlockId, BlockWithTxs, EmittedEvent, EventFilter, FieldElement,
MaybePendingBlockWithTxs, StarknetError, Transaction,
BlockId, BlockTag, BlockWithTxs, EmittedEvent, EventFilter, FieldElement,
MaybePendingBlockWithTxs, PendingBlockWithTxs, StarknetError, Transaction,
},
},
providers::{
Expand Down Expand Up @@ -47,6 +47,88 @@ struct BlockInfo {
hash: FieldElement,
}

enum MaybePendingBlock {
Confirmed(BlockWithTxs),
Pending {
pseudo_height: u64,
block: PendingBlockWithTxs,
},
}

struct PendingBlockInfo {
tx_count: usize,
}

impl MaybePendingBlock {
pub fn parent_hash(&self) -> FieldElement {
match self {
Self::Confirmed(block) => block.parent_hash,
Self::Pending { block, .. } => block.parent_hash,
}
}

pub fn timestamp(&self) -> u64 {
match self {
Self::Confirmed(block) => block.timestamp,
Self::Pending { block, .. } => block.timestamp,
}
}

pub fn height(&self) -> u64 {
match self {
Self::Confirmed(block) => block.block_number,
Self::Pending { pseudo_height, .. } => *pseudo_height,
}
}

pub fn transactions(&self) -> &[Transaction] {
match self {
Self::Confirmed(block) => &block.transactions,
Self::Pending { block, .. } => &block.transactions,
}
}

pub fn block_hash(&self) -> FieldElement {
match self {
Self::Confirmed(block) => block.block_hash,
Self::Pending {
pseudo_height,
block,
} => {
// +-------+-----------+--------------+---------------------+------------------+
// | Fixed | "PENDING" | Block Number | Transaction Count | Reserved |
// | 1 byte| 7 bytes | 8 bytes | 4 bytes | 12 bytes |
// +-------+-----------+--------------+---------------------+------------------+
// | 00 | 50 | NN | NN | 00 |
// | | 45 | NN | NN | 00 |
// | | 4E | NN | NN | 00 |
// | | 44 | NN | NN | 00 |
// | | 49 | NN | | 00 |
// | | 4E | NN | | 00 |
// | | 47 | NN | | 00 |
// | | | NN | | 00 |
// +-------+-----------+--------------+---------------------+------------------+

let mut buffer = [0u8; 32];
buffer[1] = b'P';
buffer[2] = b'E';
buffer[3] = b'N';
buffer[4] = b'D';
buffer[5] = b'I';
buffer[6] = b'N';
buffer[7] = b'G';

buffer[8..(8 + 8)].copy_from_slice(&u64::to_be_bytes(*pseudo_height));
buffer[(8 + 8)..(8 + 8 + 4)]
.copy_from_slice(&u32::to_be_bytes(block.transactions.len() as u32));

// This cannot fail as the buffer is always in range
FieldElement::from_bytes_be(&buffer).unwrap()
}
}
}
}

#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
Expand Down Expand Up @@ -76,6 +158,8 @@ async fn run(
checkpoint: &mut Checkpoint,
checkpoint_path: &PathBuf,
) {
let mut last_pending_block: Option<PendingBlockInfo> = None;

loop {
let (expected_parent_hash, current_block_number) = match checkpoint.latest_blocks.last() {
Some(last_block) => (last_block.hash, last_block.number + 1),
Expand All @@ -86,29 +170,68 @@ async fn run(
.get_block_with_txs(BlockId::Number(current_block_number))
.await
{
Ok(block) => match block {
MaybePendingBlockWithTxs::Block(block) => block,
MaybePendingBlockWithTxs::PendingBlock(_) => {
eprintln!("Unexpected pending block");
tokio::time::sleep(FAILURE_BACKOFF).await;
continue;
}
},
Ok(MaybePendingBlockWithTxs::Block(block)) => {
// Got a confirmed block. Clear the pending info now
last_pending_block = None;

MaybePendingBlock::Confirmed(block)
}
Ok(MaybePendingBlockWithTxs::PendingBlock(_)) => {
eprintln!("Unexpected pending block");
tokio::time::sleep(FAILURE_BACKOFF).await;
continue;
}
Err(starknet::providers::ProviderError::StarknetError(
StarknetError::BlockNotFound,
)) => {
// No new block. Wait a bit before trying again
tokio::time::sleep(HEAD_BACKOFF).await;
continue;
// No new block. Try the pending block now
match jsonrpc_client
.get_block_with_txs(BlockId::Tag(BlockTag::Pending))
.await
{
Ok(MaybePendingBlockWithTxs::Block(_)) => {
eprintln!("Unexpected confirmed block");
tokio::time::sleep(FAILURE_BACKOFF).await;
continue;
}
Ok(MaybePendingBlockWithTxs::PendingBlock(block)) => {
// Unlike with confirmed blocks, we simply discard non-linkable pending
// blocks
if block.parent_hash != expected_parent_hash {
eprintln!("Found unlinkable pending block");
tokio::time::sleep(HEAD_BACKOFF).await;
continue;
}

if let Some(last_pending_block) = last_pending_block.as_ref() {
if last_pending_block.tx_count >= block.transactions.len() {
eprintln!("No newer pending block found");
tokio::time::sleep(HEAD_BACKOFF).await;
continue;
}
}

MaybePendingBlock::Pending {
pseudo_height: current_block_number,
block,
}
}
Err(err) => {
eprintln!("Error downloading pending block: {err}");
tokio::time::sleep(HEAD_BACKOFF).await;
continue;
}
}
}
Err(err) => {
eprintln!("Error downloading block: {err}");
eprintln!("Error downloading confirmed block: {err}");
tokio::time::sleep(FAILURE_BACKOFF).await;
continue;
}
};

if current_block.parent_hash != expected_parent_hash {
// This can only happen with confirmed blocks as we discard non-linkable pending blocks
if current_block.parent_hash() != expected_parent_hash {
if checkpoint.latest_blocks.len() == 1 {
// Not possible to determine common ancestor
panic!("Unable to handle reorg after consuming the whole history");
Expand All @@ -124,41 +247,89 @@ async fn run(
continue;
}

println!(
"Processed block #{} ({:#064x})",
current_block.block_number, current_block.block_hash
);
// Progress log
match &current_block {
MaybePendingBlock::Confirmed(block) => {
println!(
"Processed block #{} ({:#064x})",
block.block_number, block.block_hash
);
}
MaybePendingBlock::Pending {
pseudo_height,
block,
} => {
println!(
"Processed pending block with pseudo height #{} ({} transactions)",
pseudo_height,
block.transactions.len()
);
}
}

let new_block_info = BlockInfo {
number: current_block.block_number,
hash: current_block.block_hash,
};
match &current_block {
MaybePendingBlock::Confirmed(block) => {
let new_block_info = BlockInfo {
number: block.block_number,
hash: block.block_hash,
};

if checkpoint.latest_blocks.len() >= MAX_BLOCK_HISTORY {
// TODO: use ring buffer instead as this is extremely inefficient
checkpoint.latest_blocks.remove(0);
}
checkpoint.latest_blocks.push(new_block_info);
if checkpoint.latest_blocks.len() >= MAX_BLOCK_HISTORY {
// TODO: use ring buffer instead as this is extremely inefficient
checkpoint.latest_blocks.remove(0);
}
checkpoint.latest_blocks.push(new_block_info);

if current_block.block_number % CHECKPOINT_BLOCK_INTERVAL == 0 {
if let Err(err) = try_persist_checkpoint(checkpoint_path, checkpoint) {
eprintln!("Error persisting checkpoint: {err}");
if block.block_number % CHECKPOINT_BLOCK_INTERVAL == 0 {
if let Err(err) = try_persist_checkpoint(checkpoint_path, checkpoint) {
eprintln!("Error persisting checkpoint: {err}");
}
}
}
MaybePendingBlock::Pending { block, .. } => {
last_pending_block = Some(PendingBlockInfo {
tx_count: block.transactions.len(),
})
}
}
}
}

async fn handle_block(
jsonrpc_client: &JsonRpcClient<HttpTransport>,
block: &BlockWithTxs,
block: &MaybePendingBlock,
) -> Result<()> {
let mut buffer = Vec::new();

writeln!(&mut buffer, "FIRE BLOCK_BEGIN {}", block.block_number)?;
writeln!(&mut buffer, "FIRE BLOCK_BEGIN {}", block.height())?;

let block_events = get_block_events(jsonrpc_client, block.block_hash).await?;
let block_events = get_block_events(jsonrpc_client, block).await?;

for tx in block.transactions.iter() {
// When using pending blocks, there's a race condition where a new block is confirmed before
// the `starknet_getEvents` request. We discard the block when:
//
// 1. Event list is empty when tx list is not; or
// 2. Any event refers to a tx that's not in the block.
if let MaybePendingBlock::Pending { block, .. } = block {
if block_events.is_empty() && !block.transactions.is_empty() {
anyhow::bail!("inconsistent pending block events: empty event list");
}

let block_tx_set = block
.transactions
.iter()
.map(|tx| *tx.transaction_hash())
.collect::<HashSet<_>>();

if block_events
.iter()
.any(|event| !block_tx_set.contains(&event.transaction_hash))
{
anyhow::bail!("inconsistent pending block events: invalid transaction reference");
}
}

for tx in block.transactions().iter() {
let (tx_hash, type_str) = match tx {
Transaction::Declare(tx) => (*tx.transaction_hash(), "DECLARE"),
Transaction::Deploy(tx) => (tx.transaction_hash, "DEPLOY"),
Expand Down Expand Up @@ -201,39 +372,46 @@ async fn handle_block(
writeln!(
&mut buffer,
"FIRE BLOCK_END {} {:#064x} {:#064x} {} {}",
block.block_number,
block.block_hash,
block.parent_hash,
block.timestamp,
block.transactions.len(),
block.height(),
block.block_hash(),
block.parent_hash(),
block.timestamp(),
block.transactions().len(),
)?;

print!("{}", String::from_utf8(buffer)?);

Ok(())
}

// Use `get_block_with_receipts` once we move to RPC v0.7.x
async fn get_block_events(
jsonrpc_client: &JsonRpcClient<HttpTransport>,
block_hash: FieldElement,
block: &MaybePendingBlock,
) -> Result<Vec<EmittedEvent>> {
const CHUNK_SIZE: u64 = 1000;

let mut token = None;
let mut events = vec![];

let event_filter = match block {
MaybePendingBlock::Confirmed(block) => EventFilter {
from_block: Some(BlockId::Hash(block.block_hash)),
to_block: Some(BlockId::Hash(block.block_hash)),
address: None,
keys: None,
},
MaybePendingBlock::Pending { .. } => EventFilter {
from_block: Some(BlockId::Tag(BlockTag::Pending)),
to_block: Some(BlockId::Tag(BlockTag::Pending)),
address: None,
keys: None,
},
};

loop {
let mut result = jsonrpc_client
.get_events(
EventFilter {
from_block: Some(BlockId::Hash(block_hash)),
to_block: Some(BlockId::Hash(block_hash)),
address: None,
keys: None,
},
token,
CHUNK_SIZE,
)
.get_events(event_filter.clone(), token, CHUNK_SIZE)
.await?;

events.append(&mut result.events);
Expand Down

0 comments on commit f962d6f

Please sign in to comment.