From 6330d298d420f4bbab60b55b38044ee8a1e9f9db Mon Sep 17 00:00:00 2001 From: NikolasHai <113891786+NikolasHai@users.noreply.github.com> Date: Wed, 7 Feb 2024 17:08:57 +0100 Subject: [PATCH] feat: add pre signed read state call wait (#511) --- Cargo.lock | 1 + ic-agent/src/agent/mod.rs | 86 ++++++++++++++++++++++++++++++--- ref-tests/Cargo.toml | 1 + ref-tests/tests/integration.rs | 87 +++++++++++++++++++++++++++++++++- 4 files changed, 168 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51103261..a5f1e0f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1822,6 +1822,7 @@ version = "0.0.0" dependencies = [ "candid", "ic-agent", + "ic-certification", "ic-identity-hsm", "ic-utils", "ring", diff --git a/ic-agent/src/agent/mod.rs b/ic-agent/src/agent/mod.rs index fa29d0b0..24b217fb 100644 --- a/ic-agent/src/agent/mod.rs +++ b/ic-agent/src/agent/mod.rs @@ -34,6 +34,7 @@ use crate::{ to_request_id, RequestId, }; use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; +use backoff::{exponential::ExponentialBackoff, SystemClock}; use ic_certification::{Certificate, Delegation, Label}; use ic_transport_types::{ signed::{SignedQuery, SignedRequestStatus, SignedUpdate}, @@ -656,18 +657,91 @@ impl Agent { } } + fn get_retry_policy() -> ExponentialBackoff { + ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(500)) + .with_max_interval(Duration::from_secs(1)) + .with_multiplier(1.4) + .with_max_elapsed_time(Some(Duration::from_secs(60 * 5))) + .build() + } + + /// Wait for request_status to return a Replied response and return the arg. + pub async fn wait_signed( + &self, + request_id: &RequestId, + effective_canister_id: Principal, + signed_request_status: Vec, + ) -> Result, AgentError> { + let mut retry_policy = Self::get_retry_policy(); + + let mut request_accepted = false; + loop { + match self + .request_status_signed( + request_id, + effective_canister_id, + signed_request_status.clone(), + ) + .await? + { + RequestStatusResponse::Unknown => {} + + RequestStatusResponse::Received | RequestStatusResponse::Processing => { + if !request_accepted { + retry_policy.reset(); + request_accepted = true; + } + } + + RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => return Ok(arg), + + RequestStatusResponse::Rejected(response) => { + return Err(AgentError::ReplicaError(response)) + } + + RequestStatusResponse::Done => { + return Err(AgentError::RequestStatusDoneNoReply(String::from( + *request_id, + ))) + } + }; + + match retry_policy.next_backoff() { + #[cfg(not(target_family = "wasm"))] + Some(duration) => tokio::time::sleep(duration).await, + + #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))] + Some(duration) => { + wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |rs, rj| { + if let Err(e) = web_sys::window() + .expect("global window unavailable") + .set_timeout_with_callback_and_timeout_and_arguments_0( + &rs, + duration.as_millis() as _, + ) + { + use wasm_bindgen::UnwrapThrowExt; + rj.call1(&rj, &e).unwrap_throw(); + } + })) + .await + .expect("unable to setTimeout"); + } + + None => return Err(AgentError::TimeoutWaitingForResponse()), + } + } + } + /// Call request_status on the RequestId in a loop and return the response as a byte vector. pub async fn wait( &self, request_id: RequestId, effective_canister_id: Principal, ) -> Result, AgentError> { - let mut retry_policy = ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(500)) - .with_max_interval(Duration::from_secs(1)) - .with_multiplier(1.4) - .with_max_elapsed_time(Some(Duration::from_secs(60 * 5))) - .build(); + let mut retry_policy = Self::get_retry_policy(); + let mut request_accepted = false; loop { match self.poll(&request_id, effective_canister_id).await? { diff --git a/ref-tests/Cargo.toml b/ref-tests/Cargo.toml index ec3a07b3..4fb506b3 100644 --- a/ref-tests/Cargo.toml +++ b/ref-tests/Cargo.toml @@ -17,3 +17,4 @@ tokio = { workspace = true, features = ["full"] } [dev-dependencies] serde_cbor = { workspace = true } +ic-certification = { workspace = true } diff --git a/ref-tests/tests/integration.rs b/ref-tests/tests/integration.rs index 1f2f2214..a4fa7982 100644 --- a/ref-tests/tests/integration.rs +++ b/ref-tests/tests/integration.rs @@ -4,10 +4,11 @@ //! integration tests with a running IC-Ref. use candid::CandidType; use ic_agent::{ - agent::{agent_error::HttpErrorPayload, RejectCode, RejectResponse}, + agent::{agent_error::HttpErrorPayload, Envelope, EnvelopeContent, RejectCode, RejectResponse}, export::Principal, AgentError, Identity, }; +use ic_certification::Label; use ic_utils::{ call::{AsyncCall, SyncCall}, interfaces::{ @@ -21,6 +22,12 @@ use ref_tests::{ get_wallet_wasm_from_env, universal_canister::payload, with_universal_canister, with_wallet_canister, }; +use serde::Serialize; +use std::{ + borrow::Cow, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; #[ignore] #[test] @@ -64,6 +71,84 @@ fn basic_expiry() { }) } +#[ignore] +#[test] +fn wait_signed() { + with_universal_canister(|mut agent, canister_id| async move { + fn serialized_bytes(envelope: Envelope) -> Vec { + let mut serialized_bytes = Vec::new(); + let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes); + serializer.self_describe().unwrap(); + envelope.serialize(&mut serializer).unwrap(); + serialized_bytes + } + + let arg = payload().reply_data(b"hello").build(); + let ingress_expiry = (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + + Duration::from_secs(120)) + .as_nanos() as u64; + + let agent_identity = Arc::new(create_basic_identity().unwrap()); + agent.set_arc_identity(agent_identity.clone()); + + let call_envelope_content = EnvelopeContent::Call { + sender: agent.get_principal().unwrap(), + arg: arg.clone(), + ingress_expiry, + nonce: None, + canister_id, + method_name: "update".to_string(), + }; + + let call_request_id = call_envelope_content.to_request_id(); + let call_signature = agent_identity.sign(&call_envelope_content).unwrap(); + + let call_envelope = Envelope { + content: Cow::Borrowed(&call_envelope_content), + sender_pubkey: call_signature.public_key, + sender_sig: call_signature.signature, + sender_delegation: call_signature.delegations, + }; + + let call_envelope_serialized = serialized_bytes(call_envelope); + + agent + .update_signed(canister_id, call_envelope_serialized) + .await + .unwrap(); + + let paths: Vec> = vec![vec![ + "request_status".into(), + call_request_id.to_vec().into(), + ]]; + let read_state_envelope_content = EnvelopeContent::ReadState { + sender: agent.get_principal().unwrap(), + paths, + ingress_expiry, + }; + + let read_signature = agent_identity.sign(&read_state_envelope_content).unwrap(); + + let read_state_envelope = Envelope { + content: Cow::Borrowed(&read_state_envelope_content), + sender_pubkey: read_signature.public_key, + sender_sig: read_signature.signature, + sender_delegation: read_signature.delegations, + }; + + let read_envelope_serialized = serialized_bytes(read_state_envelope); + + let result = agent + .wait_signed(&call_request_id, canister_id, read_envelope_serialized) + .await + .unwrap(); + + assert_eq!(result.as_slice(), b"hello"); + + Ok(()) + }) +} + #[ignore] #[test] fn canister_query() {