Skip to content

Commit

Permalink
RPC: Fix eth subscription notifications (#2812)
Browse files Browse the repository at this point in the history
* Enable passing null value in topic list in subscriptions RPC

* Revamp subscription pipeline, use tokio multithread runtime

* Better refactor

* Fmt rs

* Add config to start up notification RPC, defaults to false

* Revert default subscription service to startup
  • Loading branch information
sieniven authored Feb 5, 2024
1 parent 4e11ac3 commit 977c94c
Show file tree
Hide file tree
Showing 17 changed files with 270 additions and 276 deletions.
1 change: 1 addition & 0 deletions lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/ain-cpp-imports/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub mod ffi {
fn getNumConnections() -> i32;
fn getEccLruCacheCount() -> usize;
fn getEvmValidationLruCacheCount() -> usize;
fn getEvmNotificationChannelBufferSize() -> usize;
fn isEthDebugRPCEnabled() -> bool;
fn isEthDebugTraceRPCEnabled() -> bool;
}
Expand Down
8 changes: 8 additions & 0 deletions lib/ain-cpp-imports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ mod ffi {
pub fn getEvmValidationLruCacheCount() -> usize {
unimplemented!("{}", UNIMPL_MSG)
}
pub fn getEvmNotificationChannelBufferSize() -> usize {
unimplemented!("{}", UNIMPL_MSG)
}
pub fn isEthDebugRPCEnabled() -> bool {
unimplemented!("{}", UNIMPL_MSG)
}
Expand Down Expand Up @@ -287,6 +290,11 @@ pub fn get_evmv_lru_cache_count() -> usize {
ffi::getEvmValidationLruCacheCount()
}

/// Gets the EVM notification channel buffer size.
pub fn get_evm_notification_channel_buffer_size() -> usize {
ffi::getEvmNotificationChannelBufferSize()
}

/// Whether ETH-RPC debug is enabled
pub fn is_eth_debug_rpc_enabled() -> bool {
ffi::isEthDebugRPCEnabled()
Expand Down
31 changes: 7 additions & 24 deletions lib/ain-evm/src/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ use anyhow::format_err;
use ethereum::{Block, PartialHeader};
use ethereum_types::{Bloom, H160, H256, H64, U256};
use log::{debug, trace};
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
RwLock,
};

use crate::{
backend::{EVMBackend, Vicinity},
Expand All @@ -27,31 +23,27 @@ use crate::{
core::{EVMCoreService, XHash},
executor::{AinExecutor, ExecuteTx},
filters::FilterService,
log::{LogService, Notification},
log::LogService,
receipt::ReceiptService,
storage::{
traits::{BlockStorage, FlushableStorage},
Storage,
},
subscription::{Notification, SubscriptionService},
transaction::{cache::TransactionCache, SignedTx},
trie::GENESIS_STATE_ROOT,
Result,
};

pub struct NotificationChannel<T> {
pub sender: UnboundedSender<T>,
pub receiver: RwLock<UnboundedReceiver<T>>,
}

pub struct EVMServices {
pub core: EVMCoreService,
pub block: BlockService,
pub receipt: ReceiptService,
pub logs: LogService,
pub filters: FilterService,
pub subscriptions: SubscriptionService,
pub storage: Arc<Storage>,
pub tx_cache: Arc<TransactionCache>,
pub channel: NotificationChannel<Notification>,
}

pub struct ExecTxState {
Expand Down Expand Up @@ -93,7 +85,6 @@ impl EVMServices {
///
/// Returns an instance of the struct, either restored from storage or created from a JSON file.
pub fn new() -> Result<Self> {
let (sender, receiver) = mpsc::unbounded_channel();
let datadir = ain_cpp_imports::get_datadir();
let path = PathBuf::from(datadir).join("evm");
if !path.exists() {
Expand All @@ -120,12 +111,9 @@ impl EVMServices {
receipt: ReceiptService::new(Arc::clone(&storage)),
logs: LogService::new(Arc::clone(&storage)),
filters: FilterService::new(Arc::clone(&storage), Arc::clone(&tx_cache)),
subscriptions: SubscriptionService::new(),
storage,
tx_cache,
channel: NotificationChannel {
sender,
receiver: RwLock::new(receiver),
},
})
} else {
let storage = Arc::new(Storage::restore(&path)?);
Expand All @@ -136,12 +124,9 @@ impl EVMServices {
receipt: ReceiptService::new(Arc::clone(&storage)),
logs: LogService::new(Arc::clone(&storage)),
filters: FilterService::new(Arc::clone(&storage), Arc::clone(&tx_cache)),
subscriptions: SubscriptionService::new(),
storage,
tx_cache,
channel: NotificationChannel {
sender,
receiver: RwLock::new(receiver),
},
})
}
}
Expand Down Expand Up @@ -268,10 +253,8 @@ impl EVMServices {
self.logs
.generate_logs_from_receipts(&receipts, block.header.number)?;
self.receipt.put_receipts(receipts)?;
self.channel
.sender
.send(Notification::Block(block.header.hash()))
.map_err(|e| format_err!(e.to_string()))?;
self.subscriptions
.send(Notification::Block(block.header.hash()))?;
self.core.clear_account_nonce();
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions lib/ain-evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod precompiles;
pub mod receipt;
pub mod services;
pub mod storage;
pub mod subscription;
pub mod transaction;
mod trie;
pub mod weiamount;
Expand Down
5 changes: 0 additions & 5 deletions lib/ain-evm/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ pub struct LogIndex {
pub transaction_index: U256,
}

pub enum Notification {
Block(H256),
Transaction(H256),
}

pub struct LogService {
storage: Arc<Storage>,
}
Expand Down
35 changes: 35 additions & 0 deletions lib/ain-evm/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::Result;
use ethereum_types::H256;
use tokio::sync::broadcast::{self, Sender};

pub const NOTIFICATION_CHANNEL_BUFFER_SIZE: usize = 10_000;

#[derive(Clone)]
pub enum Notification {
Block(H256),
Transaction(H256),
}

pub struct SubscriptionService {
pub tx: Sender<Notification>,
}

impl SubscriptionService {
pub fn new() -> Self {
let (tx, _rx) =
broadcast::channel(ain_cpp_imports::get_evm_notification_channel_buffer_size());
Self { tx }
}

pub fn send(&self, notification: Notification) -> Result<()> {
// Do not need to handle send error since there may be no active receivers.
let _ = self.tx.send(notification);
Ok(())
}
}

impl Default for SubscriptionService {
fn default() -> Self {
Self::new()
}
}
1 change: 1 addition & 0 deletions lib/ain-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ prost.workspace = true
rustc_version_runtime.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
parking_lot.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tonic.workspace = true
ethereum.workspace = true
Expand Down
5 changes: 4 additions & 1 deletion lib/ain-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ pub fn init_network_subscriptions_service(addr: String) -> Result<()> {
.build(addr),
)?;
let mut methods: Methods = Methods::new();
methods.merge(MetachainPubSubModule::new(Arc::clone(&runtime.evm)).into_rpc())?;
methods.merge(
MetachainPubSubModule::new(Arc::clone(&runtime.evm), runtime.tokio_runtime.clone())
.into_rpc(),
)?;

runtime
.websocket_handles
Expand Down
Loading

0 comments on commit 977c94c

Please sign in to comment.