diff --git a/ethexe/cli/src/commands/key.rs b/ethexe/cli/src/commands/key.rs index e550a7412e8..81001fac1e1 100644 --- a/ethexe/cli/src/commands/key.rs +++ b/ethexe/cli/src/commands/key.rs @@ -50,15 +50,13 @@ impl KeyCommand { pub fn exec(self) -> Result<()> { let key_store = self.key_store.expect("must never be empty after merging"); - let signer = Signer::new(key_store).with_context(|| "failed to create signer")?; + let signer = Signer::new(key_store).context("failed to create signer")?; match self.command { KeySubcommand::Clear => { let len = signer.list_keys()?.len(); - signer - .clear_keys() - .with_context(|| "failed to clear keys")?; + signer.clear_keys().context("failed to clear keys")?; println!("Removed {len} keys"); } @@ -66,25 +64,21 @@ impl KeyCommand { // TODO: remove println from there. let public = signer .generate_key() - .with_context(|| "failed to generate new keypair")?; + .context("failed to generate new keypair")?; println!("Public key: {public}"); println!("Ethereum address: {}", public.to_address()); } KeySubcommand::Insert { private_key } => { - let private = private_key - .parse() - .with_context(|| "invalid `private-key`")?; + let private = private_key.parse().context("invalid `private-key`")?; - let public = signer - .add_key(private) - .with_context(|| "failed to add key")?; + let public = signer.add_key(private).context("failed to add key")?; println!("Public key: {public}"); println!("Ethereum address: {}", public.to_address()); } KeySubcommand::List => { - let publics = signer.list_keys().with_context(|| "failed to list keys")?; + let publics = signer.list_keys().context("failed to list keys")?; println!("[ No | {:^66} | {:^42} ]", "Public key", "Ethereum address"); @@ -93,21 +87,19 @@ impl KeyCommand { } } KeySubcommand::Recover { message, signature } => { - let message = - utils::hex_str_to_vec(message).with_context(|| "invalid `message`")?; - let signature = - utils::hex_str_to_vec(signature).with_context(|| "invalid `signature`")?; + let message = utils::hex_str_to_vec(message).context("invalid `message`")?; + let signature = utils::hex_str_to_vec(signature).context("invalid `signature`")?; let signature_bytes: [u8; 65] = signature .try_into() .map_err(|_| anyhow!("signature isn't 65 bytes len")) - .with_context(|| "invalid `signature`")?; + .context("invalid `signature`")?; let signature = unsafe { Signature::from_bytes(signature_bytes) }; let public = signature .recover_from_digest(message.to_digest()) - .with_context(|| "failed to recover signature from digest")?; + .context("failed to recover signature from digest")?; println!("Signed by: {public}"); println!("Ethereum address: {}", public.to_address()); @@ -116,38 +108,37 @@ impl KeyCommand { let key = key.strip_prefix("0x").unwrap_or(&key); let public = if key.len() == 66 { - key.parse().with_context(|| "invalid `key`")? + key.parse().context("invalid `key`")? } else if key.len() == 40 { let mut address_bytes = [0u8; 20]; hex::decode_to_slice(key, &mut address_bytes) .map_err(|e| anyhow!("Failed to parse eth address hex: {e}")) - .with_context(|| "invalid `key`")?; + .context("invalid `key`")?; signer .get_key_by_addr(address_bytes.into())? .ok_or_else(|| anyhow!("Unrecognized eth address")) - .with_context(|| "invalid `key`")? + .context("invalid `key`")? } else { bail!("Invalid key length: should be 33 bytes public key or 20 bytes eth address "); }; let private = signer .get_private_key(public) - .with_context(|| "failed to get private key")?; + .context("failed to get private key")?; println!("Secret key: {}", hex::encode(private.0)); println!("Public key: {public}"); println!("Ethereum address: {}", public.to_address()); } KeySubcommand::Sign { key, message } => { - let public = key.parse().with_context(|| "invalid `key`")?; + let public = key.parse().context("invalid `key`")?; - let message = - utils::hex_str_to_vec(message).with_context(|| "invalid `message`")?; + let message = utils::hex_str_to_vec(message).context("invalid `message`")?; let signature = signer .sign(public, &message) - .with_context(|| "failed to sign message")?; + .context("failed to sign message")?; println!("Signature: {signature}"); } diff --git a/ethexe/cli/src/commands/run.rs b/ethexe/cli/src/commands/run.rs index edefe1e03e7..41e9cead291 100644 --- a/ethexe/cli/src/commands/run.rs +++ b/ethexe/cli/src/commands/run.rs @@ -54,18 +54,15 @@ impl RunCommand { .filter_module("wasmtime_cranelift", LevelFilter::Off) .filter_module("cranelift", LevelFilter::Off) .try_init() - .with_context(|| "failed to initialize logger")?; + .context("failed to initialize logger")?; - let config = self - .params - .into_config() - .with_context(|| "invalid configuration")?; + let config = self.params.into_config().context("invalid configuration")?; config.log_info(); let service = Service::new(&config) .await - .with_context(|| "failed to create ethexe primary service")?; + .context("failed to create ethexe primary service")?; tokio::select! { res = service.run() => res, diff --git a/ethexe/cli/src/commands/tx.rs b/ethexe/cli/src/commands/tx.rs index 6a40e7f4293..a1b5cfc390d 100644 --- a/ethexe/cli/src/commands/tx.rs +++ b/ethexe/cli/src/commands/tx.rs @@ -78,7 +78,7 @@ impl TxCommand { pub async fn exec(self) -> Result<()> { let key_store = self.key_store.expect("must never be empty after merging"); - let signer = Signer::new(key_store).with_context(|| "failed to create signer")?; + let signer = Signer::new(key_store).context("failed to create signer")?; let rpc = self .ethereum_rpc @@ -88,17 +88,17 @@ impl TxCommand { .ethereum_router .ok_or_else(|| anyhow!("missing `ethereum-router`"))? .parse() - .with_context(|| "invalid `ethereum-router`")?; + .context("invalid `ethereum-router`")?; let sender = self .sender .ok_or_else(|| anyhow!("missing `sender`"))? .parse() - .with_context(|| "invalid `sender`")?; + .context("invalid `sender`")?; let ethereum = Ethereum::new(&rpc, router_addr, signer, sender) .await - .with_context(|| "failed to create Ethereum client")?; + .context("failed to create Ethereum client")?; let router = ethereum.router(); let router_query = router.query(); @@ -108,12 +108,12 @@ impl TxCommand { let code_id = code_id .parse() .map_err(|e| anyhow!("{e:?}")) - .with_context(|| "invalid `code-id`")?; + .context("invalid `code-id`")?; let salt = salt .map(|s| s.parse()) .transpose() - .with_context(|| "invalid `salt`")? + .context("invalid `salt`")? .unwrap_or_else(H256::random); println!("Creating program on Ethereum from code id {code_id}"); @@ -121,7 +121,7 @@ impl TxCommand { let (tx, actor_id) = router .create_program(code_id, salt) .await - .with_context(|| "failed to create program")?; + .context("failed to create program")?; println!("Completed in transaction {tx:?}"); println!( @@ -137,15 +137,14 @@ impl TxCommand { approve, watch, } => { - let mirror_addr: Address = mirror.parse().with_context(|| "invalid `mirror`")?; + let mirror_addr: Address = mirror.parse().context("invalid `mirror`")?; - let payload = - utils::hex_str_to_vec(payload).with_context(|| "invalid `payload`")?; + let payload = utils::hex_str_to_vec(payload).context("invalid `payload`")?; let maybe_code_id = router_query .program_code_id(mirror_addr.into()) .await - .with_context(|| "failed to check if mirror in known by router")?; + .context("failed to check if mirror in known by router")?; ensure!( maybe_code_id.is_some(), @@ -160,7 +159,7 @@ impl TxCommand { .wvara() .approve(mirror_addr.0.into(), value) .await - .with_context(|| "failed to approve wvara")?; + .context("failed to approve wvara")?; println!("Completed in transaction {tx:?}"); } @@ -172,7 +171,7 @@ impl TxCommand { let (tx, message_id) = mirror .send_message(payload, value) .await - .with_context(|| "failed to send message to mirror")?; + .context("failed to send message to mirror")?; println!("Completed in transaction {tx:?}"); println!("Message with id {message_id} successfully sent"); @@ -182,15 +181,14 @@ impl TxCommand { } } TxSubcommand::Upload { path_to_wasm } => { - let code = - fs::read(&path_to_wasm).with_context(|| "failed to read wasm from file")?; + let code = fs::read(&path_to_wasm).context("failed to read wasm from file")?; println!("Uploading {} to Ethereum", path_to_wasm.display(),); let (tx, code_id) = router .request_code_validation_with_sidecar(&code) .await - .with_context(|| "failed to request code validation")?; + .context("failed to request code validation")?; println!("Completed in transaction {tx:?}"); println!("Waiting for approval of code id {code_id}..."); @@ -198,7 +196,7 @@ impl TxCommand { let valid = router .wait_code_validation(code_id) .await - .with_context(|| "failed to wait for code validation")?; + .context("failed to wait for code validation")?; if valid { println!("Now you can create program from code id {code_id}!"); diff --git a/ethexe/cli/src/lib.rs b/ethexe/cli/src/lib.rs index 908eab12fda..0179296f6a3 100644 --- a/ethexe/cli/src/lib.rs +++ b/ethexe/cli/src/lib.rs @@ -44,7 +44,7 @@ impl Cli { pub async fn run(self) -> Result<()> { let params = self .file_params() - .with_context(|| "failed to read params from file")? + .context("failed to read params from file")? .unwrap_or_default(); self.command.run(params).await diff --git a/ethexe/cli/src/params/ethereum.rs b/ethexe/cli/src/params/ethereum.rs index ade6cae967c..b73a5b5cc5d 100644 --- a/ethexe/cli/src/params/ethereum.rs +++ b/ethexe/cli/src/params/ethereum.rs @@ -66,7 +66,7 @@ impl EthereumParams { .ethereum_router .ok_or_else(|| anyhow!("missing `ethereum-router`"))? .parse() - .with_context(|| "invalid `ethereum-router`")?, + .context("invalid `ethereum-router`")?, block_time: Duration::from_secs(Self::BLOCK_TIME as u64), }) } diff --git a/ethexe/cli/src/params/mod.rs b/ethexe/cli/src/params/mod.rs index a8586037df7..dfcdd0cff30 100644 --- a/ethexe/cli/src/params/mod.rs +++ b/ethexe/cli/src/params/mod.rs @@ -64,10 +64,8 @@ pub struct Params { impl Params { /// Load the parameters from a TOML file. pub fn from_file(path: PathBuf) -> Result { - let content = - std::fs::read_to_string(path).with_context(|| "failed to read params file")?; - let params = - toml::from_str(&content).with_context(|| "failed to parse toml params file")?; + let content = std::fs::read_to_string(path).context("failed to read params file")?; + let params = toml::from_str(&content).context("failed to parse toml params file")?; Ok(params) } diff --git a/ethexe/cli/src/params/network.rs b/ethexe/cli/src/params/network.rs index 491d4aeff95..b1769a20691 100644 --- a/ethexe/cli/src/params/network.rs +++ b/ethexe/cli/src/params/network.rs @@ -75,7 +75,7 @@ impl NetworkParams { .network_key .map(|k| k.parse()) .transpose() - .with_context(|| "invalid `network-key`")?; + .context("invalid `network-key`")?; let external_addresses = self .network_public_addr diff --git a/ethexe/cli/src/params/node.rs b/ethexe/cli/src/params/node.rs index 339e9e96849..5386141d1e6 100644 --- a/ethexe/cli/src/params/node.rs +++ b/ethexe/cli/src/params/node.rs @@ -77,10 +77,8 @@ impl NodeParams { Ok(NodeConfig { database_path: self.db_dir(), key_path: self.keys_dir(), - sequencer: ConfigPublicKey::new(&self.sequencer) - .with_context(|| "invalid `sequencer` key")?, - validator: ConfigPublicKey::new(&self.validator) - .with_context(|| "invalid `validator` key")?, + sequencer: ConfigPublicKey::new(&self.sequencer).context("invalid `sequencer` key")?, + validator: ConfigPublicKey::new(&self.validator).context("invalid `validator` key")?, max_commitment_depth: self.max_depth.unwrap_or(Self::DEFAULT_MAX_DEPTH).get(), worker_threads_override: self.physical_threads.map(|v| v.get() as usize), virtual_threads: self diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index a7d7c39116f..3c6eaffdf4e 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -257,7 +257,9 @@ impl NetworkEventLoop { } }; - let mut key = signer.get_private_key(key)?; + let mut key = signer + .get_private_key(key) + .context("failed to get a private key for the public one")?; let key = identity::secp256k1::SecretKey::try_from_bytes(&mut key.0) .expect("Signer provided invalid key; qed"); let pair = identity::secp256k1::Keypair::from(key); @@ -281,7 +283,9 @@ impl NetworkEventLoop { TransportType::QuicOrTcp => { let tcp = libp2p::tcp::tokio::Transport::default() .upgrade(upgrade::Version::V1Lazy) - .authenticate(libp2p::tls::Config::new(&keypair)?) + .authenticate( + libp2p::tls::Config::new(&keypair).context("failed tls config creation")?, + ) .multiplex(yamux::Config::default()) .timeout(Duration::from_secs(20)); @@ -302,7 +306,8 @@ impl NetworkEventLoop { .boxed(), }; - let behaviour = Behaviour::new(&keypair, db)?; + let behaviour = + Behaviour::new(&keypair, db).context("failed ethexe network behaviour creation")?; let local_peer_id = keypair.public().to_peer_id(); let config = SwarmConfig::with_tokio_executor(); @@ -482,7 +487,7 @@ impl NetworkEventLoop { .gossipsub .publish(gpu_commitments_topic(), data) { - log::debug!("gossipsub publishing failed: {e}") + log::error!("gossipsub publishing failed: {e}") } } NetworkSenderEvent::RequestDbData(request) => { diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index cccaa09349d..11f1917b209 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -9,7 +9,7 @@ use alloy::{ rpc::types::eth::{Filter, Header, Topic}, transports::BoxTransport, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use ethexe_common::events::{BlockEvent, BlockRequestEvent, RouterEvent, RouterRequestEvent}; use ethexe_db::BlockHeader; use ethexe_ethereum::{ @@ -230,6 +230,7 @@ pub(crate) async fn read_code_from_tx_hash( let code = blob_reader .read_blob_from_tx_hash(tx_hash, attempts) .await + .context("blob reader failed to read blob from tx hash") .map_err(|err| anyhow!("failed to read blob: {err}"))?; (CodeId::generate(&code) == expected_code_id) @@ -248,7 +249,10 @@ pub(crate) async fn read_block_events( router_address: AlloyAddress, ) -> Result> { let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); - let wvara_address = router_query.wvara_address().await?; + let wvara_address = router_query + .wvara_address() + .await + .context("failed to get wvara address")?; let filter = Filter::new().at_block_hash(block_hash.to_fixed_bytes()); @@ -265,7 +269,10 @@ pub(crate) async fn read_block_events_batch( router_address: AlloyAddress, ) -> Result>> { let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); - let wvara_address = router_query.wvara_address().await?; + let wvara_address = router_query + .wvara_address() + .await + .context("failed to get wvara address")?; let mut res = HashMap::new(); @@ -313,12 +320,13 @@ async fn read_events_impl( provider.get_logs(&router_and_wvara_filter), provider.get_logs(&mirror_filter), ) - .await?; + .await + .context("failed getting logs from router | wvara | mirror contracts")?; let block_hash_of = |log: &alloy::rpc::types::Log| -> Result { log.block_hash .map(|v| v.0.into()) - .ok_or_else(|| anyhow!("Block hash is missing")) + .ok_or_else(|| anyhow!("Block hash of the log is missing")) }; let mut res: HashMap<_, Vec<_>> = HashMap::new(); @@ -327,9 +335,13 @@ async fn read_events_impl( let block_hash = block_hash_of(&router_or_wvara_log)?; let maybe_block_event = if router_or_wvara_log.address() == router_address { - router::events::try_extract_event(&router_or_wvara_log)?.map(Into::into) + router::events::try_extract_event(&router_or_wvara_log) + .context("failed extracting router event")? + .map(Into::into) } else { - wvara::events::try_extract_event(&router_or_wvara_log)?.map(Into::into) + wvara::events::try_extract_event(&router_or_wvara_log) + .context("failed extracting wvara event")? + .map(Into::into) }; if let Some(block_event) = maybe_block_event { @@ -344,7 +356,9 @@ async fn read_events_impl( // TODO (breathx): if address is unknown, then continue. - if let Some(event) = mirror::events::try_extract_event(&mirror_log)? { + let maybe_event = mirror::events::try_extract_event(&mirror_log) + .context("failed extracting mirror events")?; + if let Some(event) = maybe_event { res.entry(block_hash) .or_default() .push(BlockEvent::mirror(address, event)); @@ -362,7 +376,10 @@ pub(crate) async fn read_block_request_events( router_address: AlloyAddress, ) -> Result> { let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); - let wvara_address = router_query.wvara_address().await?; + let wvara_address = router_query + .wvara_address() + .await + .context("failed to get wvara address")?; let filter = Filter::new().at_block_hash(block_hash.to_fixed_bytes()); @@ -378,7 +395,10 @@ pub(crate) async fn read_block_request_events_batch( router_address: AlloyAddress, ) -> Result>> { let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); - let wvara_address = router_query.wvara_address().await?; + let wvara_address = router_query + .wvara_address() + .await + .context("failed to get wvara address")?; let mut res = HashMap::new(); @@ -428,12 +448,13 @@ async fn read_request_events_impl( provider.get_logs(&router_and_wvara_filter), provider.get_logs(&mirror_filter), ) - .await?; + .await + .context("failed getting logs from router | wvara | mirror contracts")?; let block_hash_of = |log: &alloy::rpc::types::Log| -> Result { log.block_hash .map(|v| v.0.into()) - .ok_or(anyhow!("Block hash is missing")) + .ok_or(anyhow!("Block hash of the log is missing")) }; let out_of_scope_addresses = [ @@ -448,9 +469,12 @@ async fn read_request_events_impl( let block_hash = block_hash_of(&router_or_wvara_log)?; let maybe_block_request_event = if router_or_wvara_log.address() == router_address { - router::events::try_extract_request_event(&router_or_wvara_log)?.map(Into::into) + router::events::try_extract_request_event(&router_or_wvara_log) + .context("failed extracting router request event")? + .map(Into::into) } else { - wvara::events::try_extract_request_event(&router_or_wvara_log)? + wvara::events::try_extract_request_event(&router_or_wvara_log) + .context("failed extracting wvara request event")? .filter(|v| !v.involves_addresses(&out_of_scope_addresses)) .map(Into::into) }; @@ -467,7 +491,9 @@ async fn read_request_events_impl( // TODO (breathx): if address is unknown, then continue. - if let Some(request_event) = mirror::events::try_extract_request_event(&mirror_log)? { + let maybe_request_event = mirror::events::try_extract_request_event(&mirror_log) + .context("failed extracting mirror request event")?; + if let Some(request_event) = maybe_request_event { res.entry(block_hash) .or_default() .push(BlockRequestEvent::mirror(address, request_event)); @@ -517,7 +543,9 @@ async fn read_committed_blocks_impl( let mut res = Vec::with_capacity(logs.len()); for log in logs { - if let Some(hash) = router::events::try_extract_committed_block_hash(&log)? { + let maybe_hash = router::events::try_extract_committed_block_hash(&log) + .context("failed extracting committedd block hash from router log")?; + if let Some(hash) = maybe_hash { res.push(hash); } } diff --git a/ethexe/observer/src/query.rs b/ethexe/observer/src/query.rs index 60fd4bc1c7b..8edec710456 100644 --- a/ethexe/observer/src/query.rs +++ b/ethexe/observer/src/query.rs @@ -12,7 +12,7 @@ use alloy::{ providers::{Provider, ProviderBuilder}, rpc::{client::BatchRequest, types::eth::BlockTransactionsKind}, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use ethexe_common::{ db::{BlockHeader, BlockMetaStorage}, events::{BlockEvent, BlockRequestEvent, RouterEvent}, @@ -87,7 +87,8 @@ impl Query { // TODO (breathx): optimize me ASAP. Ok(self .get_block_events(block_hash) - .await? + .await + .context("failed rpc (alloy) request to get block events")? .into_iter() .filter_map(|event| match event { BlockEvent::Router(RouterEvent::BlockCommitted { hash }) => Some(hash), @@ -117,14 +118,19 @@ impl Query { }) .collect(); - batch.send().await?; + batch + .send() + .await + .context("failed to send batch request by rpc (alloy")?; let blocks: Vec<_> = future::join_all(handles).await; let mut res = Vec::with_capacity(blocks.len()); for block in blocks { - let block = block?.ok_or_else(|| anyhow!("Block not found"))?; + let block = block + .context("failed 'eth_getBlockByNumber' rpc call (alloy)")? + .ok_or_else(|| anyhow!("Block not found"))?; let block_hash = H256(block.header.hash.0); let height = block.header.number as u32; @@ -166,6 +172,7 @@ impl Query { tokio::spawn(async move { Self::batch_get_block_headers(provider, database, start as u64, end as u64) .await + .context("failed to get block headers in batch") }) }) .collect(); @@ -180,12 +187,13 @@ impl Query { ); let (headers_batches, maybe_events) = future::join(headers_fut, events_fut).await; - let mut events = maybe_events?; + let mut events = maybe_events + .context("failed to read block request events when loading chain in batch")?; let mut res = HashMap::with_capacity(total_blocks as usize); for batch in headers_batches { - let batch = batch??; + let batch = batch?.context("failed to request block headers batch")?; for (hash, header) in batch { self.database @@ -201,7 +209,10 @@ impl Query { } pub async fn get_last_committed_chain(&mut self, block_hash: H256) -> Result> { - let current_block = self.get_block_header_meta(block_hash).await?; + let current_block = self + .get_block_header_meta(block_hash) + .await + .with_context(|| format!("failed getting current block {block_hash} header meta"))?; let latest_valid_block_height = self .database .latest_valid_block() @@ -231,19 +242,27 @@ impl Query { let mut chain = Vec::new(); let mut headers_map = HashMap::new(); + let from_block = latest_valid_block_height + 1; + let to_block = current_block.height; let committed_blocks = read_committed_blocks_batch( latest_valid_block_height + 1, current_block.height, &self.provider, self.router_address, ) - .await?; + .await + .with_context(|| { + format!("failed to read committed blocks batch from {from_block} to {to_block} block") + })?; if is_deep_sync { // Load blocks in batch from provider by numbers. headers_map = self .load_chain_batch(latest_valid_block_height + 1, current_block.height) - .await?; + .await + .with_context(|| { + format!("failed to load chain from {from_block} to {to_block} block") + })?; } // Continue loading chain by parent hashes from the current block to the latest valid block. @@ -258,7 +277,9 @@ impl Query { { let header = match headers_map.get(&hash) { Some(header) => header.clone(), - None => self.get_block_header_meta(hash).await?, + None => self.get_block_header_meta(hash).await.with_context(|| { + format!("failed getting block {hash} header meta for chain recovery") + })?, }; self.database.set_latest_valid_block(hash, header); @@ -273,7 +294,9 @@ impl Query { // Fetch parent hash from headers_map or database hash = match headers_map.get(&hash) { Some(header) => header.parent_hash, - None => self.get_block_parent_hash(hash).await?, + None => self.get_block_parent_hash(hash).await.with_context(|| { + format!("failed getting block {hash} parent hash for chain recovery") + })?, }; } @@ -299,7 +322,7 @@ impl Query { log::trace!("Include block {hash} in chain for processing"); chain.push(hash); - hash = self.get_block_parent_hash(hash).await?; + hash = self.get_block_parent_hash(hash).await.with_context(|| format!("failed getting block {hash} parent hash for finding oldest not committed block"))?; } log::trace!("Oldest not committed block reached: {}", hash); @@ -308,7 +331,10 @@ impl Query { } pub async fn propagate_meta_for_block(&mut self, block_hash: H256) -> Result<()> { - let parent = self.get_block_parent_hash(block_hash).await?; + let parent = self + .get_block_parent_hash(block_hash) + .await + .with_context(|| format!("failed getting block {block_hash} parent hash"))?; if !self .database @@ -338,7 +364,10 @@ impl Query { .database .block_commitment_queue(parent) .ok_or_else(|| anyhow!("parent block commitment queue not found"))?; - let committed_blocks = self.get_committed_blocks(block_hash).await?; + let committed_blocks = self + .get_committed_blocks(block_hash) + .await + .with_context(|| format!("failed to get commited blocks in {block_hash} block"))?; let current_queue = queue .into_iter() .filter(|hash| !committed_blocks.contains(hash)) @@ -373,8 +402,9 @@ impl Query { let block = self .provider .get_block_by_hash(block_hash.0.into(), BlockTransactionsKind::Hashes) - .await? - .ok_or_else(|| anyhow!("Block not found"))?; + .await + .with_context(|| format!("failed getting block {block_hash} by rpc (alloy)"))? + .ok_or_else(|| anyhow!("Block {block_hash} not found"))?; let height = u32::try_from(block.header.number).unwrap_or_else(|err| { unreachable!("Ethereum block number not fit in u32: {err}") @@ -393,7 +423,10 @@ impl Query { // Populate block events in db. let events = read_block_request_events(block_hash, &self.provider, self.router_address) - .await?; + .await + .with_context(|| { + format!("failed reading block {block_hash} request events") + })?; self.database.set_block_events(block_hash, events); Ok(meta) @@ -402,11 +435,17 @@ impl Query { } pub async fn get_block_parent_hash(&mut self, block_hash: H256) -> Result { - Ok(self.get_block_header_meta(block_hash).await?.parent_hash) + Ok(self + .get_block_header_meta(block_hash) + .await + .context("failed getting block header meta to get parent hash")? + .parent_hash) } pub async fn get_block_events(&mut self, block_hash: H256) -> Result> { - read_block_events(block_hash, &self.provider, self.router_address).await + read_block_events(block_hash, &self.provider, self.router_address) + .await + .context("failed reading block events") } pub async fn get_block_request_events( @@ -417,8 +456,9 @@ impl Query { return Ok(events); } - let events = - read_block_request_events(block_hash, &self.provider, self.router_address).await?; + let events = read_block_request_events(block_hash, &self.provider, self.router_address) + .await + .with_context(|| format!("failed reading block {block_hash} request events"))?; self.database.set_block_events(block_hash, events.clone()); Ok(events) @@ -435,5 +475,6 @@ impl Query { read_code_from_tx_hash(blob_reader, expected_code_id, blob_tx_hash, attempts) .await .map(|res| res.1) + .with_context(|| format!("failed reading code from tx hash {blob_tx_hash}")) } } diff --git a/ethexe/processor/src/handling/events.rs b/ethexe/processor/src/handling/events.rs index 5d22ee2fc99..ed64d66273e 100644 --- a/ethexe/processor/src/handling/events.rs +++ b/ethexe/processor/src/handling/events.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use super::ProcessingHandler; -use anyhow::{ensure, Result}; +use anyhow::{ensure, Context, Result}; use ethexe_common::{ events::{MirrorRequestEvent, RouterRequestEvent, WVaraRequestEvent}, gear::ValueClaim, @@ -115,10 +115,10 @@ impl ProcessingHandler { }); }); - transitions.remove_task( - expiry, - &ScheduledTask::RemoveFromMailbox((actor_id, source), replied_to), - )?; + let task = ScheduledTask::RemoveFromMailbox((actor_id, source), replied_to); + transitions.remove_task(expiry, &task).with_context(|| { + format!("failed removing task {task:#?} from transitions") + })?; let reply = Dispatch::new_reply(storage, replied_to, source, payload, value)?; @@ -145,10 +145,10 @@ impl ProcessingHandler { }); }); - transitions.remove_task( - expiry, - &ScheduledTask::RemoveFromMailbox((actor_id, source), claimed_id), - )?; + let task = ScheduledTask::RemoveFromMailbox((actor_id, source), claimed_id); + transitions.remove_task(expiry, &task).with_context(|| { + format!("failed removing task {task:#?} from transitions") + })?; let reply = Dispatch::reply( claimed_id, diff --git a/ethexe/processor/src/handling/mod.rs b/ethexe/processor/src/handling/mod.rs index 3d08e4805a0..f49c2051890 100644 --- a/ethexe/processor/src/handling/mod.rs +++ b/ethexe/processor/src/handling/mod.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use crate::Processor; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::{ state::ProgramState, InBlockTransitions, ScheduleHandler, TransitionController, @@ -82,11 +82,17 @@ impl Processor { &mut self, original_code: impl AsRef<[u8]>, ) -> Result> { - let mut executor = self.creator.instantiate()?; + let mut executor = self + .creator + .instantiate() + .context("failed creating instance wrapper")?; let original_code = original_code.as_ref(); - let Some(instrumented_code) = executor.instrument(original_code)? else { + let Some(instrumented_code) = executor + .instrument(original_code) + .context("failed instrumenting the code")? + else { return Ok(None); }; diff --git a/ethexe/processor/src/host/mod.rs b/ethexe/processor/src/host/mod.rs index 7b6bcfbd0b1..d253d25abeb 100644 --- a/ethexe/processor/src/host/mod.rs +++ b/ethexe/processor/src/host/mod.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use core_processor::common::JournalNote; use gear_core::{code::InstrumentedCode, ids::ProgramId}; use gprimitives::{CodeId, H256}; @@ -78,7 +78,10 @@ impl InstanceCreator { pub fn instantiate(&self) -> Result { let mut store = Store::new(&self.engine, Default::default()); - let instance = self.instance_pre.instantiate(&mut store)?; + let instance = self + .instance_pre + .instantiate(&mut store) + .context("failed instantiating wasm module from pre-instantiation state")?; let mut instance_wrapper = InstanceWrapper { instance, diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index aa010dd5386..5788e2fc458 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -18,7 +18,7 @@ //! Program's execution service for eGPU. -use anyhow::{anyhow, ensure, Result}; +use anyhow::{anyhow, ensure, Context, Result}; use ethexe_common::events::{BlockRequestEvent, MirrorRequestEvent}; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::state::Storage; @@ -93,7 +93,11 @@ impl Processor { ) -> Result> { log::debug!("Processing upload code {code_id:?}"); - let valid = code_id == CodeId::generate(code) && self.handle_new_code(code)?.is_some(); + let valid = code_id == CodeId::generate(code) + && self + .handle_new_code(code) + .context("failed handling new code")? + .is_some(); self.db.set_code_valid(code_id, valid); Ok(vec![LocalOutcome::CodeValidated { id: code_id, valid }]) @@ -106,15 +110,21 @@ impl Processor { ) -> Result> { log::debug!("Processing events for {block_hash:?}: {events:#?}"); - let mut handler = self.handler(block_hash)?; + let mut handler = self + .handler(block_hash) + .context("failed creating processing handler")?; for event in events { match event { BlockRequestEvent::Router(event) => { - handler.handle_router_event(event)?; + handler + .handle_router_event(event) + .context("failed handling router event")?; } BlockRequestEvent::Mirror { actor_id, event } => { - handler.handle_mirror_event(actor_id, event)?; + handler + .handle_mirror_event(actor_id, event) + .context("failed handling mirror event")?; } BlockRequestEvent::WVara(event) => { handler.handle_wvara_event(event); diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 5026d95b0b3..cedb44683c5 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use apis::{BlockApi, BlockServer, ProgramApi, ProgramServer}; use ethexe_db::Database; use futures::FutureExt; @@ -68,9 +68,12 @@ impl RpcService { } pub async fn run_server(self) -> Result { - let listener = TcpListener::bind(self.config.listen_addr).await?; + let listener = TcpListener::bind(self.config.listen_addr) + .await + .context("failed to bing tcp listener for rpc server")?; - let cors = util::try_into_cors(self.config.cors)?; + let cors = util::try_into_cors(self.config.cors) + .context("failed to convert config cors to `CorsLayer`")?; let http_middleware = tower::ServiceBuilder::new().layer(cors); @@ -79,8 +82,12 @@ impl RpcService { .to_service_builder(); let mut module = JsonrpcModule::new(()); - module.merge(ProgramServer::into_rpc(ProgramApi::new(self.db.clone())))?; - module.merge(BlockServer::into_rpc(BlockApi::new(self.db.clone())))?; + module + .merge(ProgramServer::into_rpc(ProgramApi::new(self.db.clone()))) + .context("failed to define `Program` api methods")?; + module + .merge(BlockServer::into_rpc(BlockApi::new(self.db.clone()))) + .context("failed to define `Block` api methods")?; let (stop_handle, server_handle) = stop_channel(); @@ -111,6 +118,8 @@ impl RpcService { let cfg2 = cfg.clone(); let svc = tower::service_fn(move |req: hyper::Request| { + log::trace!("Received request - {req:#?}"); + let PerConnection { methods, stop_handle, @@ -132,12 +141,20 @@ impl RpcService { async move { log::info!("WebSocket connection accepted"); - svc.call(req).await.map_err(|e| anyhow!("Error: {:?}", e)) + svc.call(req) + .await + .map_err(|e| anyhow!("Error: {:?}", e)) + .context("rpc service failed processing websocket request") } .boxed() } else { - async move { svc.call(req).await.map_err(|e| anyhow!("Error: {:?}", e)) } - .boxed() + async move { + svc.call(req) + .await + .map_err(|e| anyhow!("Error: {:?}", e)) + .context("rpc service failed processing request") + } + .boxed() } }); diff --git a/ethexe/rpc/src/util.rs b/ethexe/rpc/src/util.rs index 8236795c701..edbecd15b21 100644 --- a/ethexe/rpc/src/util.rs +++ b/ethexe/rpc/src/util.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use anyhow::Result; +use anyhow::{Context, Result}; use hyper::header::HeaderValue; use tower_http::cors::{AllowOrigin, CorsLayer}; @@ -25,7 +25,9 @@ pub(crate) fn try_into_cors(maybe_cors: Option>) -> Result Result { let digest = to_router_digest(commitments_digest, router_address); - signer.sign_digest(pub_key, digest) + signer + .sign_digest(pub_key, digest) + .context("failed to sign commitments digest") } fn recover_from_commitments_digest( @@ -159,6 +162,7 @@ fn recover_from_commitments_digest( signature .recover_from_digest(to_router_digest(commitments_digest, router_address)) .map(|k| k.to_address()) + .context("failed to recover address from commitments digest") } fn to_router_digest(commitments_digest: Digest, router_address: Address) -> Digest { diff --git a/ethexe/sequencer/src/lib.rs b/ethexe/sequencer/src/lib.rs index 3aa442fd151..ff4500a1518 100644 --- a/ethexe/sequencer/src/lib.rs +++ b/ethexe/sequencer/src/lib.rs @@ -21,7 +21,7 @@ pub mod agro; use agro::{AggregatedCommitments, MultisignedCommitmentDigests, MultisignedCommitments}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use ethexe_common::{ db::BlockMetaStorage, gear::{BlockCommitment, CodeCommitment}, @@ -203,11 +203,15 @@ impl Sequencer { match (codes_future, blocks_future) { (Some(codes_future), Some(transitions_future)) => { let (codes_tx, transitions_tx) = futures::join!(codes_future, transitions_future); - codes_tx?; - transitions_tx?; + codes_tx.context("failed submitting code commitments")?; + transitions_tx.context("failed submitting block commitments")?; } - (Some(codes_future), None) => codes_future.await?, - (None, Some(transitions_future)) => transitions_future.await?, + (Some(codes_future), None) => codes_future + .await + .context("failed submitting code commitments")?, + (None, Some(transitions_future)) => transitions_future + .await + .context("failed submitting block commitments")?, (None, None) => {} } @@ -438,7 +442,9 @@ impl Sequencer { commitments_storage: &mut CommitmentsMap, commitments_filter: impl Fn(&C) -> bool, ) -> Result<()> { - let origin = aggregated.recover(router_address)?; + let origin = aggregated + .recover(router_address) + .context("failed to recover address from aggregated commitments")?; if validators.contains(&origin).not() { return Err(anyhow!("Unknown validator {origin} or invalid signature")); @@ -473,17 +479,14 @@ impl Sequencer { return Err(anyhow!("No candidate found")); }; - candidate.append_signature_with_check( - commitments_digest, - signature, - router_address, - |origin| { + candidate + .append_signature_with_check(commitments_digest, signature, router_address, |origin| { validators .contains(&origin) .then_some(()) .ok_or_else(|| anyhow!("Unknown validator {origin} or invalid signature")) - }, - ) + }) + .context("failed to append signature to candidate for multisigned commitment") } fn update_status(&mut self, update_fn: F) diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index e11a4d9827b..24ee0da7cc7 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -91,11 +91,11 @@ impl Service { config.ethereum.block_time, ) .await - .with_context(|| "failed to create blob reader")?, + .context("failed to create blob reader")?, ); let rocks_db = ethexe_db::RocksDatabase::open(config.node.database_path.clone()) - .with_context(|| "failed to open database")?; + .context("failed to open database")?; let db = ethexe_db::Database::from_one(&rocks_db, config.ethereum.router_address.0); let observer = ethexe_observer::Observer::new( @@ -104,16 +104,16 @@ impl Service { blob_reader.clone(), ) .await - .with_context(|| "failed to create observer")?; + .context("failed to create observer")?; let router_query = RouterQuery::new(&config.ethereum.rpc, config.ethereum.router_address) .await - .with_context(|| "failed to create router query")?; + .context("failed to create router query")?; let genesis_block_hash = router_query .genesis_block_hash() .await - .with_context(|| "failed to query genesis hash")?; + .context("failed to query genesis hash")?; if genesis_block_hash.is_zero() { log::error!( @@ -128,13 +128,13 @@ impl Service { let validators = router_query .validators() .await - .with_context(|| "failed to query validators")?; + .context("failed to query validators")?; log::info!("👥 Validators set: {validators:?}"); let threshold = router_query .threshold() .await - .with_context(|| "failed to query validators threshold")?; + .context("failed to query validators threshold")?; log::info!("🔒 Multisig threshold: {threshold} / {}", validators.len()); let query = ethexe_observer::Query::new( @@ -146,7 +146,7 @@ impl Service { config.node.max_commitment_depth, ) .await - .with_context(|| "failed to create observer query")?; + .context("failed to create observer query")?; let processor = ethexe_processor::Processor::with_config( ProcessorConfig { @@ -155,7 +155,7 @@ impl Service { }, db.clone(), ) - .with_context(|| "failed to create processor")?; + .context("failed to create processor")?; if let Some(worker_threads) = processor.config().worker_threads_override { log::info!("🔧 Overriding amount of physical threads for runtime: {worker_threads}"); @@ -167,11 +167,11 @@ impl Service { ); let signer = ethexe_signer::Signer::new(config.node.key_path.clone()) - .with_context(|| "failed to create signer")?; + .context("failed to create signer")?; let sequencer = if let Some(key) = Self::get_config_public_key(config.node.sequencer, &signer) - .with_context(|| "failed to get sequencer private key")? + .context("failed to get sequencer private key")? { Some( ethexe_sequencer::Sequencer::new( @@ -186,14 +186,14 @@ impl Service { Box::new(db.clone()), ) .await - .with_context(|| "failed to create sequencer")?, + .context("failed to create sequencer")?, ) } else { None }; let validator = Self::get_config_public_key(config.node.validator, &signer) - .with_context(|| "failed to get validator private key")? + .context("failed to get validator key")? .map(|key| { ethexe_validator::Validator::new( ðexe_validator::Config { @@ -208,7 +208,7 @@ impl Service { let metrics_service = if let Some(config) = config.prometheus.clone() { // Set static metrics. let metrics = - MetricsService::new(&config).with_context(|| "failed to create metrics service")?; + MetricsService::new(&config).context("failed to create metrics service")?; tokio::spawn( ethexe_prometheus::init_prometheus(config.addr, config.registry).map(drop), ); @@ -221,7 +221,7 @@ impl Service { let network = if let Some(net_config) = &config.network { Some( ethexe_network::NetworkService::new(net_config.clone(), &signer, db.clone()) - .with_context(|| "failed to create network service")?, + .context("failed to create network service")?, ) } else { None @@ -296,7 +296,10 @@ impl Service { processor: &mut ethexe_processor::Processor, block_hash: H256, ) -> Result<()> { - let events = query.get_block_request_events(block_hash).await?; + let events = query + .get_block_request_events(block_hash) + .await + .context("failed getting block request events")?; for event in events { match event { @@ -319,9 +322,14 @@ impl Service { .code_blob_tx(code_id) .ok_or_else(|| anyhow!("Blob tx hash not found"))?; - let code = query.download_code(code_id, blob_tx_hash).await?; + let code = query + .download_code(code_id, blob_tx_hash) + .await + .context("failed downloading code")?; - processor.process_upload_code(code_id, code.as_slice())?; + processor + .process_upload_code(code_id, code.as_slice()) + .context("failed processing upload code from `RequestEvent::Block`")?; } _ => continue, } @@ -340,13 +348,23 @@ impl Service { return Ok(transitions); } - query.propagate_meta_for_block(block_hash).await?; + query + .propagate_meta_for_block(block_hash) + .await + .with_context(|| format!("failed to propogate meta for block {block_hash}"))?; - Self::process_upload_codes(db, query, processor, block_hash).await?; + Self::process_upload_codes(db, query, processor, block_hash) + .await + .context("failed processing upload codes during block processing")?; - let block_request_events = query.get_block_request_events(block_hash).await?; + let block_request_events = query + .get_block_request_events(block_hash) + .await + .context("failed getting block request events")?; - let block_outcomes = processor.process_block_events(block_hash, block_request_events)?; + let block_outcomes = processor + .process_block_events(block_hash, block_request_events) + .context("failed processing block events")?; let transition_outcomes: Vec<_> = block_outcomes .into_iter() @@ -392,10 +410,15 @@ impl Service { let mut commitments = vec![]; - let last_committed_chain = query.get_last_committed_chain(block_data.hash).await?; + let last_committed_chain = query + .get_last_committed_chain(block_data.hash) + .await + .context("failed to get last committed chain")?; for block_hash in last_committed_chain.into_iter().rev() { - let transitions = Self::process_one_block(db, query, processor, block_hash).await?; + let transitions = Self::process_one_block(db, query, processor, block_hash) + .await + .context("failed to get block state transitions")?; if transitions.is_empty() { // Skip empty blocks @@ -437,13 +460,16 @@ impl Service { block_data.header.parent_hash ); - let commitments = - Self::process_block_event(db, query, processor, block_data).await?; + let commitments = Self::process_block_event(db, query, processor, block_data) + .await + .context("failed processing block event")?; Ok((Vec::new(), commitments)) } RequestEvent::CodeLoaded { code_id, code } => { - let outcomes = processor.process_upload_code(code_id, code.as_slice())?; + let outcomes = processor + .process_upload_code(code_id, code.as_slice()) + .context("failed processing upload code from `RequestEvent::CodeLoaded`")?; let commitments: Vec<_> = outcomes .into_iter() .map(|outcome| match outcome { @@ -457,7 +483,9 @@ impl Service { // Important: sequencer must process event after event processing by service. if let Some(sequencer) = maybe_sequencer { - sequencer.process_observer_event(&observer_event)?; + sequencer + .process_observer_event(&observer_event) + .context("failed sequencer processing observer event")?; } res @@ -510,7 +538,10 @@ impl Service { let mut rpc_handle = if let Some(rpc) = rpc { log::info!("🌐 Rpc server starting at: {}", rpc.port()); - let rpc_run = rpc.run_server().await?; + let rpc_run = rpc + .run_server() + .await + .context("failed to start rpc server")?; Some(tokio::spawn(rpc_run.stopped())) } else { @@ -545,7 +576,9 @@ impl Service { &mut processor, &mut sequencer, observer_event, - ).await?; + ) + .await + .context("failed processing observer event")?; Self::post_process_commitments( code_commitments, @@ -553,7 +586,8 @@ impl Service { validator.as_mut(), sequencer.as_mut(), network_sender.as_mut(), - ).await?; + ).await + .context("failed commitments post-processing")?; if is_block_event { collection_round_timer.start(); @@ -568,7 +602,7 @@ impl Service { validator.as_mut(), sequencer.as_mut(), network_sender.as_mut() - )?; + ).context("failed collected commitments processing")?; collection_round_timer.stop(); validation_round_timer.start(); @@ -576,7 +610,7 @@ impl Service { _ = validation_round_timer.wait() => { log::debug!("Validation round timeout, process validated commitments..."); - Self::process_approved_commitments(sequencer.as_mut()).await?; + Self::process_approved_commitments(sequencer.as_mut()).await.context("failed approved commitments processing")?; validation_round_timer.stop(); } @@ -591,7 +625,7 @@ impl Service { validator.as_mut(), sequencer.as_mut(), network_sender.as_mut(), - ); + ).context("failed processing network message"); if let Err(err) = result { // TODO: slash peer/validator in case of error #4175 @@ -600,7 +634,7 @@ impl Service { } } NetworkReceiverEvent::ExternalValidation(validating_response) => { - let validated = Self::process_response_validation(&validating_response, &mut router_query).await?; + let validated = Self::process_response_validation(&validating_response, &mut router_query).await.context("failed response validation processing")?; let res = if validated { Ok(validating_response) } else { @@ -648,12 +682,14 @@ impl Service { .is_empty() .not() .then(|| validator.aggregate(code_commitments)) - .transpose()?; + .transpose() + .context("failed to aggregate code commitments")?; let aggregated_blocks = block_commitments .is_empty() .not() .then(|| validator.aggregate(block_commitments)) - .transpose()?; + .transpose() + .context("failed to aggregate block commitments")?; if aggregated_codes.is_none() && aggregated_blocks.is_none() { return Ok(()); @@ -676,14 +712,18 @@ impl Service { "Received ({}) signed code commitments from local validator...", aggregated.len() ); - sequencer.receive_code_commitments(aggregated)?; + sequencer + .receive_code_commitments(aggregated) + .context("failed receiving code commitments for post processing commitments")?; } if let Some(aggregated) = aggregated_blocks { log::debug!( "Received ({}) signed block commitments from local validator...", aggregated.len() ); - sequencer.receive_block_commitments(aggregated)?; + sequencer.receive_block_commitments(aggregated).context( + "failed receiving block commitments for post processing commitments", + )?; } } @@ -767,7 +807,8 @@ impl Service { if code_requests.is_empty().not() { match validator.validate_code_commitments(db, code_requests) { Result::Ok((digest, signature)) => { - sequencer.receive_codes_signature(digest, signature)? + sequencer.receive_codes_signature(digest, signature) + .context("failed to receive codes signature when processing collected commitments")? } Result::Err(err) => { log::warn!("Collected code commitments validation failed: {err}") @@ -778,7 +819,8 @@ impl Service { if block_requests.is_empty().not() { match validator.validate_block_commitments(db, block_requests) { Result::Ok((digest, signature)) => { - sequencer.receive_blocks_signature(digest, signature)? + sequencer.receive_blocks_signature(digest, signature) + .context("failed to receive blocks signature when processing collected commitments")? } Result::Err(err) => { log::warn!("Collected block commitments validation failed: {err}") @@ -797,7 +839,10 @@ impl Service { return Ok(()); }; - sequencer.submit_multisigned_commitments().await + sequencer + .submit_multisigned_commitments() + .await + .context("failed submitting multisigned commitments") } fn process_network_message( @@ -807,17 +852,22 @@ impl Service { maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>, maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, ) -> Result<()> { - let message = NetworkMessage::decode(&mut data)?; + let message = NetworkMessage::decode(&mut data) + .context("failed decoding data into network message")?; match message { NetworkMessage::PublishCommitments { codes, blocks } => { let Some(sequencer) = maybe_sequencer else { return Ok(()); }; if let Some(aggregated) = codes { - sequencer.receive_code_commitments(aggregated)?; + sequencer + .receive_code_commitments(aggregated) + .context("failed receiving external code commitments")?; } if let Some(aggregated) = blocks { - sequencer.receive_block_commitments(aggregated)?; + sequencer + .receive_block_commitments(aggregated) + .context("failed receiving external block commitments")?; } Ok(()) } @@ -833,13 +883,15 @@ impl Service { .is_empty() .not() .then(|| validator.validate_code_commitments(db, codes)) - .transpose()?; + .transpose() + .context("failed to validate externally received code commitments")?; let blocks = blocks .is_empty() .not() .then(|| validator.validate_block_commitments(db, blocks)) - .transpose()?; + .transpose() + .context("failed to validate externally received block commitments")?; let message = NetworkMessage::ApproveCommitments { codes, blocks }; network_sender.publish_message(message.encode()); @@ -852,11 +904,15 @@ impl Service { }; if let Some((digest, signature)) = codes { - sequencer.receive_codes_signature(digest, signature)?; + sequencer + .receive_codes_signature(digest, signature) + .context("failed to receive external codes signature")?; } if let Some((digest, signature)) = blocks { - sequencer.receive_blocks_signature(digest, signature)?; + sequencer + .receive_blocks_signature(digest, signature) + .context("failed to receive external blocks signature")?; } Ok(()) @@ -871,14 +927,20 @@ impl Service { let response = validating_response.response(); if let db_sync::Response::ProgramIds(ids) = response { - let ethereum_programs = router_query.programs_count().await?; + let ethereum_programs = router_query + .programs_count() + .await + .context("failed to get programs count")?; if ethereum_programs != U256::from(ids.len()) { return Ok(false); } // TODO: #4309 for &id in ids { - let code_id = router_query.program_code_id(id).await?; + let code_id = router_query + .program_code_id(id) + .await + .with_context(|| format!("failed to get program's {id} code is"))?; if code_id.is_none() { return Ok(false); } diff --git a/ethexe/signer/src/lib.rs b/ethexe/signer/src/lib.rs index b43276dc041..b1da251f035 100644 --- a/ethexe/signer/src/lib.rs +++ b/ethexe/signer/src/lib.rs @@ -26,7 +26,7 @@ use secp256k1::hashes::hex::{Case, DisplayHex}; pub use sha3; pub use signature::Signature; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use gprimitives::{ActorId, H160}; use parity_scale_codec::{Decode, Encode}; use sha3::Digest as _; @@ -175,7 +175,9 @@ pub struct Signer { impl Signer { pub fn new(key_store: PathBuf) -> Result { - fs::create_dir_all(key_store.as_path())?; + let path = key_store.as_path(); + fs::create_dir_all(path) + .with_context(|| format!("failed to create dir on path {}", path.display()))?; Ok(Self { key_store }) } @@ -245,7 +247,12 @@ impl Signer { let local_public = PublicKey::from_bytes(public_key.serialize()); let key_file = self.key_store.join(local_public.to_hex()); - fs::write(key_file, secret_key.secret_bytes())?; + fs::write(&key_file, secret_key.secret_bytes()).with_context(|| { + format!( + "failed to write private key to the file - {}", + key_file.display() + ) + })?; Ok(local_public) } @@ -261,7 +268,12 @@ impl Signer { ); let key_file = self.key_store.join(local_public.to_hex()); - fs::write(key_file, secret_key.secret_bytes())?; + fs::write(&key_file, secret_key.secret_bytes()).with_context(|| { + format!( + "failed to write private key to the file - {}", + key_file.display() + ) + })?; Ok(local_public) } @@ -274,10 +286,15 @@ impl Signer { pub fn list_keys(&self) -> Result> { let mut keys = vec![]; - for entry in fs::read_dir(&self.key_store)? { - let entry = entry?; - let file_name = entry.file_name(); - let key = PublicKey::from_str(file_name.to_string_lossy().as_ref())?; + let dir_entries = fs::read_dir(&self.key_store) + .with_context(|| format!("failed to read dir {}", self.key_store.display()))?; + for entry in dir_entries { + let entry = entry.context("failed to get key store dir entry")?; + let os_file_name = entry.file_name(); + let string_file_name = os_file_name.to_string_lossy(); + let key = PublicKey::from_str(string_file_name.as_ref()).with_context(|| { + format!("can't convert file name {string_file_name} to public key") + })?; keys.push(key); } @@ -288,7 +305,8 @@ impl Signer { let mut buf = [0u8; 32]; let key_path = self.key_store.join(key.to_hex()); - let bytes = fs::read(key_path)?; + let bytes = fs::read(&key_path) + .with_context(|| format!("failed to read key from path {}", key_path.display()))?; if bytes.len() != 32 { bail!("Invalid key length: {:?}", bytes); diff --git a/ethexe/signer/src/signature.rs b/ethexe/signer/src/signature.rs index 4c2879f710f..e9844697210 100644 --- a/ethexe/signer/src/signature.rs +++ b/ethexe/signer/src/signature.rs @@ -33,7 +33,7 @@ pub struct RawSignature([u8; 65]); impl RawSignature { pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { let secp_secret_key = secp256k1::SecretKey::from_slice(&private_key.0) - .with_context(|| "Invalid secret key format")?; + .context("Invalid secret key format")?; let message = Message::from_digest(digest.into()); @@ -84,12 +84,14 @@ impl Signature { pub fn recover_from_digest(&self, digest: Digest) -> Result { let sig = (*self).try_into()?; let public_key = secp256k1::global::SECP256K1 - .recover_ecdsa(&Message::from_digest(digest.into()), &sig)?; + .recover_ecdsa(&Message::from_digest(digest.into()), &sig) + .context("failed to recover public key from ecdsa signature")?; Ok(PublicKey::from_bytes(public_key.serialize())) } pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { - let raw_signature = RawSignature::create_for_digest(private_key, digest)?; + let raw_signature = RawSignature::create_for_digest(private_key, digest) + .context("failed creating raw signature for digest")?; Ok(raw_signature.into()) } } diff --git a/ethexe/validator/src/lib.rs b/ethexe/validator/src/lib.rs index 227b14183c4..d4a1efa1a8e 100644 --- a/ethexe/validator/src/lib.rs +++ b/ethexe/validator/src/lib.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use anyhow::{anyhow, ensure, Result}; +use anyhow::{anyhow, ensure, Context, Result}; use ethexe_common::{ db::{BlockMetaStorage, CodesStorage}, gear::{BlockCommitment, CodeCommitment}, @@ -113,6 +113,7 @@ impl Validator { self.pub_key, self.router_address, ) + .context("failed to aggregate commitments") } pub fn validate_code_commitments( @@ -124,7 +125,8 @@ impl Validator { for request in requests { log::debug!("Receive code commitment for validation: {:?}", request); commitment_digests.push(request.to_digest()); - Self::validate_code_commitment(db, request)?; + Self::validate_code_commitment(db, request) + .context("failed code commitments validation")?; } let commitments_digest = commitment_digests.iter().collect(); @@ -135,6 +137,7 @@ impl Validator { self.router_address, ) .map(|signature| (commitments_digest, signature)) + .context("failed to sign code commitments digest") } pub fn validate_block_commitments( @@ -146,7 +149,8 @@ impl Validator { for request in requests.into_iter() { log::debug!("Receive block commitment for validation: {:?}", request); commitment_digests.push(request.to_digest()); - Self::validate_block_commitment(db, request)?; + Self::validate_block_commitment(db, request) + .context("failed block commitments validation")?; } let commitments_digest = commitment_digests.iter().collect(); @@ -157,6 +161,7 @@ impl Validator { self.router_address, ) .map(|signature| (commitments_digest, signature)) + .context("failed to sign block commitments digest") } fn validate_code_commitment(db: &impl CodesStorage, request: CodeCommitment) -> Result<()> {