From 3c3532219c811c2ecfda2061fa43ea939bb4ac93 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Thu, 5 Dec 2024 17:17:10 +0100 Subject: [PATCH 1/8] feat: add last cache change tracking field in eth state + eth state service --- crates/rpc/rpc-eth-types/src/cache/mod.rs | 44 ++++++++++++++++------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 168638872407..13689f427cdd 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -61,15 +61,10 @@ type HeaderLruCache = MultiConsumerLruCache { to_service: UnboundedSender>, -} - -impl Clone for EthStateCache { - fn clone(&self) -> Self { - Self { to_service: self.to_service.clone() } - } + latest_chain_change: Option>, } impl EthStateCache { @@ -95,8 +90,9 @@ impl EthStateCache { action_rx: UnboundedReceiverStream::new(rx), action_task_spawner, rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)), + latest_chain_change: None, }; - let cache = Self { to_service }; + let cache = Self { to_service, latest_chain_change: None }; (cache, service) } @@ -186,6 +182,21 @@ impl EthStateCache { let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx }); rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? } + + /// Returns the most recent canonical block from the cache, if available. + /// Used to efficiently handle latest block requests and avoid race conditions during chain + /// reorgs. + /// Returns None if no canonical chain is tracked or during reorgs. + pub async fn latest_block_with_senders( + &self, + ) -> ProviderResult>>> { + if let Some(chain_change) = &self.latest_chain_change { + if let Some(latest_block) = chain_change.blocks.last() { + return self.get_sealed_block_with_senders(latest_block.hash()).await; + } + } + Ok(None) + } } /// A task than manages caches for data required by the `eth` rpc implementation. @@ -236,6 +247,8 @@ pub(crate) struct EthStateCacheService< action_task_spawner: Tasks, /// Rate limiter rate_limiter: Arc, + /// Tracks latest canonical chain state for cache consistency + latest_chain_change: Option>, } impl EthStateCacheService @@ -458,7 +471,8 @@ where } } CacheAction::CacheNewCanonicalChain { chain_change } => { - for block in chain_change.blocks { + this.latest_chain_change = Some(chain_change.clone()); + for block in chain_change.clone().blocks { this.on_new_block(block.hash(), Ok(Some(Arc::new(block)))); } @@ -527,12 +541,14 @@ enum CacheAction { }, } +#[derive(Clone, Debug)] struct BlockReceipts { block_hash: B256, receipts: Vec>, } /// A change of the canonical chain +#[derive(Debug, Clone)] struct ChainChange { blocks: Vec>, receipts: Vec>, @@ -558,9 +574,13 @@ impl ChainChange { /// Awaits for new chain events and directly inserts them into the cache so they're available /// immediately before they need to be fetched from disk. /// +/// Updates [`EthStateCache`] in two scenario : +/// 1. On reorgs: sets [`latest_chain_change`] to None and removes reorged blocks +/// 2. On new canonical blocks: updates [`latest_chain_change`] and caches the new blocks +/// /// Reorged blocks are removed from the cache. pub async fn cache_new_blocks_task( - eth_state_cache: EthStateCache, + mut eth_state_cache: EthStateCache, mut events: St, ) where St: Stream> + Unpin + 'static, @@ -568,13 +588,13 @@ pub async fn cache_new_blocks_task( while let Some(event) = events.next().await { if let Some(reverted) = event.reverted() { let chain_change = ChainChange::new(reverted); - + eth_state_cache.latest_chain_change = None; let _ = eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change }); } let chain_change = ChainChange::new(event.committed()); - + eth_state_cache.latest_chain_change = Some(chain_change.clone()); let _ = eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change }); } From acc7fb06319b47ba885091cdefac74a8a1b4517a Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Fri, 20 Dec 2024 12:28:36 +0100 Subject: [PATCH 2/8] fix: resolve merge conflicts in block.rs and cache mod --- crates/rpc/rpc-eth-api/src/helpers/block.rs | 74 +++++++++++++-------- crates/rpc/rpc-eth-types/src/cache/mod.rs | 4 +- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index 1ae084fbe45f..d69ef02f1322 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use alloy_eips::BlockId; use alloy_primitives::Sealable; use alloy_rlp::Encodable; -use alloy_rpc_types_eth::{Block, BlockTransactions, Header, Index}; +use alloy_rpc_types_eth::{Block, BlockNumberOrTag, BlockTransactions, Header, Index}; use futures::Future; use reth_node_api::BlockBody; use reth_primitives::{SealedBlockFor, SealedBlockWithSenders}; @@ -91,21 +91,31 @@ pub trait EthBlocks: LoadBlock { .map(|block| block.body.transactions().len())) } - let block_hash = match self - .provider() - .block_hash_for_id(block_id) - .map_err(Self::Error::from_eth_err)? - { - Some(block_hash) => block_hash, - None => return Ok(None), - }; + match block_id { + BlockId::Number(BlockNumberOrTag::Latest) => Ok(self + .cache() + .latest_block_with_senders() + .await + .map_err(Self::Error::from_eth_err)? + .map(|b| b.body.transactions().len())), + _ => { + let block_hash = match self + .provider() + .block_hash_for_id(block_id) + .map_err(Self::Error::from_eth_err)? + { + Some(block_hash) => block_hash, + None => return Ok(None), + }; - Ok(self - .cache() - .get_sealed_block_with_senders(block_hash) - .await - .map_err(Self::Error::from_eth_err)? - .map(|b| b.body.transactions().len())) + Ok(self + .cache() + .get_sealed_block_with_senders(block_hash) + .await + .map_err(Self::Error::from_eth_err)? + .map(|b| b.body.transactions().len())) + } + } } } @@ -241,19 +251,29 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking + RpcNodeCoreExt { }; } - let block_hash = match self - .provider() - .block_hash_for_id(block_id) - .map_err(Self::Error::from_eth_err)? - { - Some(block_hash) => block_hash, - None => return Ok(None), - }; + let block = match block_id { + BlockId::Number(BlockNumberOrTag::Latest) => self + .cache() + .latest_block_with_senders() + .await + .map_err(Self::Error::from_eth_err)?, + _ => { + let block_hash = match self + .provider() + .block_hash_for_id(block_id) + .map_err(Self::Error::from_eth_err)? + { + Some(block_hash) => block_hash, + None => return Ok(None), + }; - self.cache() - .get_sealed_block_with_senders(block_hash) - .await - .map_err(Self::Error::from_eth_err) + self.cache() + .get_sealed_block_with_senders(block_hash) + .await + .map_err(Self::Error::from_eth_err)? + } + }; + Ok(block) } } } diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 13689f427cdd..60ff2309da37 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -185,8 +185,8 @@ impl EthStateCache { /// Returns the most recent canonical block from the cache, if available. /// Used to efficiently handle latest block requests and avoid race conditions during chain - /// reorgs. - /// Returns None if no canonical chain is tracked or during reorgs. + /// reorgs. + /// Returns `None` if no canonical chain is tracked or during reorgs. pub async fn latest_block_with_senders( &self, ) -> ProviderResult>>> { From 88a8516b98b686be4edb1319257e288799410973 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Thu, 5 Dec 2024 18:06:14 +0100 Subject: [PATCH 3/8] fix: resolve merge conflicts in and cache mod --- crates/rpc/rpc-eth-types/src/cache/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 60ff2309da37..0ced498ce10a 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -575,8 +575,9 @@ impl ChainChange { /// immediately before they need to be fetched from disk. /// /// Updates [`EthStateCache`] in two scenario : -/// 1. On reorgs: sets [`latest_chain_change`] to None and removes reorged blocks -/// 2. On new canonical blocks: updates [`latest_chain_change`] and caches the new blocks +/// 1. On reorgs: sets `EthStateCache::latest_chain_change` to None and removes reorged blocks +/// 2. On new canonical blocks: updates `EthStateCache::latest_chain_change` and caches the new +/// blocks /// /// Reorged blocks are removed from the cache. pub async fn cache_new_blocks_task( From 376154937c0b3b4e408ff0bd902c183838668742 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Fri, 6 Dec 2024 17:49:19 +0100 Subject: [PATCH 4/8] fix: fix with fallback for lastest hash not cached --- crates/rpc/rpc-eth-api/src/helpers/block.rs | 33 ++++++++++++++++----- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index d69ef02f1322..abca7ae0de31 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -88,7 +88,7 @@ pub trait EthBlocks: LoadBlock { .provider() .pending_block() .map_err(Self::Error::from_eth_err)? - .map(|block| block.body.transactions().len())) + .map(|block| block.body.transactions().len())); } match block_id { @@ -165,7 +165,7 @@ pub trait EthBlocks: LoadBlock { .get_block_and_receipts(block_hash) .await .map_err(Self::Error::from_eth_err) - .map(|b| b.map(|(b, r)| (b.block.clone(), r))) + .map(|b| b.map(|(b, r)| (b.block.clone(), r))); } Ok(None) @@ -252,11 +252,30 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking + RpcNodeCoreExt { } let block = match block_id { - BlockId::Number(BlockNumberOrTag::Latest) => self - .cache() - .latest_block_with_senders() - .await - .map_err(Self::Error::from_eth_err)?, + BlockId::Number(BlockNumberOrTag::Latest) => { + if let Some(block) = self + .cache() + .latest_block_with_senders() + .await + .map_err(Self::Error::from_eth_err)? + { + Some(block) + } else { + // Fallback to traditional lookup if latest isn't cached + match self + .provider() + .block_hash_for_id(block_id) + .map_err(Self::Error::from_eth_err)? + { + Some(block_hash) => self + .cache() + .get_sealed_block_with_senders(block_hash) + .await + .map_err(Self::Error::from_eth_err)?, + None => None, + } + } + } _ => { let block_hash = match self .provider() From dbf9d79aa14c47309d8467f83592f5660196266e Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Fri, 20 Dec 2024 14:47:12 +0100 Subject: [PATCH 5/8] refactor: change latest_block_with_senders fn with new variant GetLatestBlockWithSenders and its integration --- crates/rpc/rpc-eth-api/src/helpers/block.rs | 52 ++++++--------------- crates/rpc/rpc-eth-types/src/cache/mod.rs | 21 ++++++--- 2 files changed, 29 insertions(+), 44 deletions(-) diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index abca7ae0de31..3d55ce7a317b 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -250,49 +250,25 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking + RpcNodeCoreExt { None => Ok(None), }; } - - let block = match block_id { - BlockId::Number(BlockNumberOrTag::Latest) => { - if let Some(block) = self + let maybe_block = if block_id.is_latest() { + self.cache().latest_block_with_senders().await.map_err(Self::Error::from_eth_err)? + } else { + // If not latest, get block by hash + match self + .provider() + .block_hash_for_id(block_id) + .map_err(Self::Error::from_eth_err)? + { + Some(block_hash) => self .cache() - .latest_block_with_senders() - .await - .map_err(Self::Error::from_eth_err)? - { - Some(block) - } else { - // Fallback to traditional lookup if latest isn't cached - match self - .provider() - .block_hash_for_id(block_id) - .map_err(Self::Error::from_eth_err)? - { - Some(block_hash) => self - .cache() - .get_sealed_block_with_senders(block_hash) - .await - .map_err(Self::Error::from_eth_err)?, - None => None, - } - } - } - _ => { - let block_hash = match self - .provider() - .block_hash_for_id(block_id) - .map_err(Self::Error::from_eth_err)? - { - Some(block_hash) => block_hash, - None => return Ok(None), - }; - - self.cache() .get_sealed_block_with_senders(block_hash) .await - .map_err(Self::Error::from_eth_err)? + .map_err(Self::Error::from_eth_err)?, + None => None, } }; - Ok(block) + + Ok(maybe_block) } } } diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 0ced498ce10a..3297f9025612 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -190,12 +190,9 @@ impl EthStateCache { pub async fn latest_block_with_senders( &self, ) -> ProviderResult>>> { - if let Some(chain_change) = &self.latest_chain_change { - if let Some(latest_block) = chain_change.blocks.last() { - return self.get_sealed_block_with_senders(latest_block.hash()).await; - } - } - Ok(None) + let (response_tx, rx) = oneshot::channel(); + let _ = self.to_service.send(CacheAction::GetLatestBlockWithSenders { response_tx }); + rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? } } @@ -361,6 +358,15 @@ where } Some(action) => { match action { + CacheAction::GetLatestBlockWithSenders { response_tx } => { + let latest_block = this + .latest_chain_change + .as_ref() + .and_then(|chain_change| chain_change.blocks.last()) + .cloned() + .map(Arc::new); + let _ = response_tx.send(Ok(latest_block)); + } CacheAction::GetBlockWithSenders { block_hash, response_tx } => { if let Some(block) = this.full_block_cache.get(&block_hash).cloned() { let _ = response_tx.send(Ok(Some(block))); @@ -513,6 +519,9 @@ enum CacheAction { block_hash: B256, response_tx: BlockWithSendersResponseSender, }, + GetLatestBlockWithSenders { + response_tx: BlockWithSendersResponseSender, + }, GetHeader { block_hash: B256, response_tx: HeaderResponseSender, From c2107194ec052eed9163c69acb813d9f5877b580 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:00:32 +0100 Subject: [PATCH 6/8] refactor: remove latest_chain_change from EthStateCache --- crates/rpc/rpc-eth-types/src/cache/mod.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 3297f9025612..eae6f8f09d53 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -64,7 +64,6 @@ type HeaderLruCache = MultiConsumerLruCache { to_service: UnboundedSender>, - latest_chain_change: Option>, } impl EthStateCache { @@ -92,7 +91,7 @@ impl EthStateCache { rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)), latest_chain_change: None, }; - let cache = Self { to_service, latest_chain_change: None }; + let cache = Self { to_service }; (cache, service) } @@ -583,14 +582,9 @@ impl ChainChange { /// Awaits for new chain events and directly inserts them into the cache so they're available /// immediately before they need to be fetched from disk. /// -/// Updates [`EthStateCache`] in two scenario : -/// 1. On reorgs: sets `EthStateCache::latest_chain_change` to None and removes reorged blocks -/// 2. On new canonical blocks: updates `EthStateCache::latest_chain_change` and caches the new -/// blocks -/// /// Reorged blocks are removed from the cache. pub async fn cache_new_blocks_task( - mut eth_state_cache: EthStateCache, + eth_state_cache: EthStateCache, mut events: St, ) where St: Stream> + Unpin + 'static, @@ -598,13 +592,11 @@ pub async fn cache_new_blocks_task( while let Some(event) = events.next().await { if let Some(reverted) = event.reverted() { let chain_change = ChainChange::new(reverted); - eth_state_cache.latest_chain_change = None; let _ = eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change }); } let chain_change = ChainChange::new(event.committed()); - eth_state_cache.latest_chain_change = Some(chain_change.clone()); let _ = eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change }); } From 9dbbe6125eeb5b4fc31e81905cbd8169f475a916 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:03:50 +0100 Subject: [PATCH 7/8] chore: remove unecessary changes --- crates/rpc/rpc-eth-api/src/helpers/block.rs | 4 ++-- crates/rpc/rpc-eth-types/src/cache/mod.rs | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index 3d55ce7a317b..852699f7a9c4 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -88,7 +88,7 @@ pub trait EthBlocks: LoadBlock { .provider() .pending_block() .map_err(Self::Error::from_eth_err)? - .map(|block| block.body.transactions().len())); + .map(|block| block.body.transactions().len())) } match block_id { @@ -165,7 +165,7 @@ pub trait EthBlocks: LoadBlock { .get_block_and_receipts(block_hash) .await .map_err(Self::Error::from_eth_err) - .map(|b| b.map(|(b, r)| (b.block.clone(), r))); + .map(|b| b.map(|(b, r)| (b.block.clone(), r))) } Ok(None) diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index eae6f8f09d53..bab9f20b5b63 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -592,11 +592,13 @@ pub async fn cache_new_blocks_task( while let Some(event) = events.next().await { if let Some(reverted) = event.reverted() { let chain_change = ChainChange::new(reverted); + let _ = eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change }); } let chain_change = ChainChange::new(event.committed()); + let _ = eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change }); } From c82697eaf983ac1b1eadd72fd0ddd629e56c08e4 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:11:33 +0100 Subject: [PATCH 8/8] refactor: improve latest_block_with_senders integration in block_transaction_count --- crates/rpc/rpc-eth-api/src/helpers/block.rs | 35 +++++++++------------ 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index 852699f7a9c4..03e5710e8f99 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use alloy_eips::BlockId; use alloy_primitives::Sealable; use alloy_rlp::Encodable; -use alloy_rpc_types_eth::{Block, BlockNumberOrTag, BlockTransactions, Header, Index}; +use alloy_rpc_types_eth::{Block, BlockTransactions, Header, Index}; use futures::Future; use reth_node_api::BlockBody; use reth_primitives::{SealedBlockFor, SealedBlockWithSenders}; @@ -91,31 +91,24 @@ pub trait EthBlocks: LoadBlock { .map(|block| block.body.transactions().len())) } - match block_id { - BlockId::Number(BlockNumberOrTag::Latest) => Ok(self - .cache() - .latest_block_with_senders() - .await + let maybe_block = if block_id.is_latest() { + self.cache().latest_block_with_senders().await.map_err(Self::Error::from_eth_err)? + } else { + match self + .provider() + .block_hash_for_id(block_id) .map_err(Self::Error::from_eth_err)? - .map(|b| b.body.transactions().len())), - _ => { - let block_hash = match self - .provider() - .block_hash_for_id(block_id) - .map_err(Self::Error::from_eth_err)? - { - Some(block_hash) => block_hash, - None => return Ok(None), - }; - - Ok(self + { + Some(block_hash) => self .cache() .get_sealed_block_with_senders(block_hash) .await - .map_err(Self::Error::from_eth_err)? - .map(|b| b.body.transactions().len())) + .map_err(Self::Error::from_eth_err)?, + None => None, } - } + }; + + Ok(maybe_block.map(|b| b.body.transactions().len())) } }