From b78b1c7473417a8784eda1883e602bf99cf4b6ed Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Thu, 21 Dec 2023 15:47:31 +0100 Subject: [PATCH] Reorganize sgx code in TransferService --- Cargo.lock | 2 +- exe-unit/Cargo.toml | 2 - exe-unit/components/transfer/Cargo.toml | 2 + exe-unit/components/transfer/src/cache.rs | 1 - exe-unit/components/transfer/src/lib.rs | 4 +- exe-unit/components/transfer/src/transfer.rs | 160 ++++++++++--------- exe-unit/src/lib.rs | 10 +- exe-unit/src/message.rs | 4 +- 8 files changed, 100 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61f7ecbe72..6431695dfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8392,7 +8392,6 @@ dependencies = [ "openssl", "rand 0.8.5", "regex", - "reqwest", "rustyline 7.1.0", "secp256k1 0.27.0", "serde", @@ -9343,6 +9342,7 @@ dependencies = [ "percent-encoding", "rand 0.8.5", "regex", + "reqwest", "secp256k1 0.27.0", "serde", "serial_test 0.5.1 (git+https://github.com/tworec/serial_test.git?branch=actix_rt_test)", diff --git a/exe-unit/Cargo.toml b/exe-unit/Cargo.toml index 6e08c74409..e5ff92faf1 100644 --- a/exe-unit/Cargo.toml +++ b/exe-unit/Cargo.toml @@ -18,7 +18,6 @@ compat-deployment = [] sgx = [ 'graphene-sgx', 'openssl/vendored', - 'reqwest/trust-dns', 'secp256k1/rand', 'ya-client-model/sgx', 'ya-core-model/sgx', @@ -76,7 +75,6 @@ log = "0.4" openssl = { version = "0.10", optional = true } rand = "0.8.5" regex = "1.5" -reqwest = { version = "0.11", optional = true } secp256k1 = { version = "0.27.0", optional = true } serde = { version = "^1.0", features = ["derive"] } serde_json = "1.0" diff --git a/exe-unit/components/transfer/Cargo.toml b/exe-unit/components/transfer/Cargo.toml index 109b88ba91..67d1e0f648 100644 --- a/exe-unit/components/transfer/Cargo.toml +++ b/exe-unit/components/transfer/Cargo.toml @@ -32,6 +32,7 @@ log = "0.4" percent-encoding = "2.1" rand = "0.8" regex = "1.3.4" +reqwest = { version = "0.11", optional = true } serde = "1.0.104" sha3 = "0.8.2" tempdir = "0.3.7" @@ -47,6 +48,7 @@ async-trait = "0.1.74" sgx = [ 'ya-client-model/sgx', 'ya-core-model/sgx', + 'reqwest/trust-dns', ] framework-test = [] diff --git a/exe-unit/components/transfer/src/cache.rs b/exe-unit/components/transfer/src/cache.rs index acebc731b9..956d54131b 100644 --- a/exe-unit/components/transfer/src/cache.rs +++ b/exe-unit/components/transfer/src/cache.rs @@ -37,7 +37,6 @@ impl Cache { } #[inline(always)] - #[cfg(not(feature = "sgx"))] pub fn to_temp_path(&self, path: &CachePath) -> ProjectedPath { ProjectedPath::local(self.tmp_dir.clone(), path.temp_path()) } diff --git a/exe-unit/components/transfer/src/lib.rs b/exe-unit/components/transfer/src/lib.rs index 171aff4f2d..0873b7d02b 100644 --- a/exe-unit/components/transfer/src/lib.rs +++ b/exe-unit/components/transfer/src/lib.rs @@ -72,7 +72,7 @@ where log::debug!("Transferring from offset: {}", ctx.state.offset()); - let stream = wrap_stream(src.source(&src_url.url, ctx), src_url)?; + let stream = with_hash_stream(src.source(&src_url.url, ctx), src_url)?; let sink = dst.destination(&dst_url.url, ctx); transfer(stream, sink).await?; @@ -92,7 +92,7 @@ where } } -fn wrap_stream( +fn with_hash_stream( stream: TransferStream, url: &TransferUrl, ) -> Result> + Unpin>, Error> { diff --git a/exe-unit/components/transfer/src/transfer.rs b/exe-unit/components/transfer/src/transfer.rs index a8be46e75f..09d002abf8 100644 --- a/exe-unit/components/transfer/src/transfer.rs +++ b/exe-unit/components/transfer/src/transfer.rs @@ -7,7 +7,7 @@ use actix::prelude::*; use futures::future::Abortable; use url::Url; -use crate::cache::Cache; +use crate::cache::{Cache, CachePath}; use crate::error::Error; use crate::error::Error as TransferError; use crate::{ @@ -21,6 +21,20 @@ use ya_utils_futures::abort::Abort; pub type Result = std::result::Result; +macro_rules! actor_try { + ($expr:expr) => { + match $expr { + Ok(val) => val, + Err(err) => { + return ActorResponse::reply(Err(Error::from(err))); + } + } + }; + ($expr:expr,) => { + $crate::actor_try!($expr) + }; +} + #[derive(Clone, Debug, Message)] #[rtype(result = "Result<()>")] pub struct TransferResource { @@ -119,6 +133,77 @@ impl TransferService { .ok_or_else(|| TransferError::UnsupportedSchemeError(scheme.to_owned()))? .clone()) } + + #[cfg(feature = "sgx")] + fn deploy_sgx( + &self, + src_url: TransferUrl, + _src_name: CachePath, + path: PathBuf, + ) -> ActorResponse>> { + let fut = async move { + let resp = reqwest::get(src_url.url) + .await + .map_err(|e| Error::Other(e.to_string()))?; + let bytes = resp + .bytes() + .await + .map_err(|e| Error::Other(e.to_string()))?; + std::fs::write(&path, bytes)?; + Ok(Some(path)) + }; + ActorResponse::r#async(fut.into_actor(self)) + } + + #[allow(unused)] + fn deploy_no_sgx( + &self, + src_url: TransferUrl, + src_name: CachePath, + path: PathBuf, + ) -> ActorResponse>> { + let path_tmp = self.cache.to_temp_path(&src_name).to_path_buf(); + + let src = actor_try!(self.provider(&src_url)); + let dst: Rc = Default::default(); + let dst_url = TransferUrl { + url: Url::from_file_path(&path_tmp).unwrap(), + hash: None, + }; + + let handles = self.abort_handles.clone(); + let fut = async move { + if path.exists() { + log::info!("Deploying cached image: {:?}", path); + return Ok(Some(path)); + } + + let (abort, reg) = Abort::new_pair(); + { + let ctx = Default::default(); + let retry = transfer_with(src, &src_url, dst, &dst_url, &ctx); + + let _guard = AbortHandleGuard::register(handles, abort); + Ok::<_, Error>( + Abortable::new(retry, reg) + .await + .map_err(TransferError::from)? + .map_err(|err| { + if let TransferError::InvalidHashError { .. } = err { + let _ = std::fs::remove_file(&path_tmp); + } + err + })?, + ) + }?; + + move_file(&path_tmp, &path).await?; + log::info!("Deployment from {:?} finished", src_url.url); + + Ok(Some(path)) + }; + ActorResponse::r#async(fut.into_actor(self)) + } } impl Actor for TransferService { @@ -133,20 +218,6 @@ impl Actor for TransferService { } } -macro_rules! actor_try { - ($expr:expr) => { - match $expr { - Ok(val) => val, - Err(err) => { - return ActorResponse::reply(Err(Error::from(err))); - } - } - }; - ($expr:expr,) => { - $crate::actor_try!($expr) - }; -} - impl Handler for TransferService { type Result = ActorResponse>>; @@ -164,65 +235,10 @@ impl Handler for TransferService { log::info!("Deploying from {:?} to {:?}", src_url.url, path); #[cfg(not(feature = "sgx"))] - { - let path_tmp = self.cache.to_temp_path(&src_name).to_path_buf(); - - let src = actor_try!(self.provider(&src_url)); - let dst: Rc = Default::default(); - let dst_url = TransferUrl { - url: Url::from_file_path(&path_tmp).unwrap(), - hash: None, - }; - - let handles = self.abort_handles.clone(); - let fut = async move { - if path.exists() { - log::info!("Deploying cached image: {:?}", path); - return Ok(Some(path)); - } - - let (abort, reg) = Abort::new_pair(); - { - let ctx = Default::default(); - let retry = transfer_with(src, &src_url, dst, &dst_url, &ctx); - - let _guard = AbortHandleGuard::register(handles, abort); - Ok::<_, Error>( - Abortable::new(retry, reg) - .await - .map_err(TransferError::from)? - .map_err(|err| { - if let TransferError::InvalidHashError { .. } = err { - let _ = std::fs::remove_file(&path_tmp); - } - err - })?, - ) - }?; - - move_file(&path_tmp, &path).await?; - log::info!("Deployment from {:?} finished", src_url.url); - - Ok(Some(path)) - }; - ActorResponse::r#async(fut.into_actor(self)) - } + return self.deploy_no_sgx(src_url, src_name, path); #[cfg(feature = "sgx")] - { - let fut = async move { - let resp = reqwest::get(src_url.url) - .await - .map_err(|e| Error::Other(e.to_string()))?; - let bytes = resp - .bytes() - .await - .map_err(|e| Error::Other(e.to_string()))?; - std::fs::write(&path, bytes)?; - Ok(Some(path)) - }; - ActorResponse::r#async(fut.into_actor(self)) - } + return self.deploy_sgx(src_url, src_name, path); } } diff --git a/exe-unit/src/lib.rs b/exe-unit/src/lib.rs index 88769f647c..e5f941c3b6 100644 --- a/exe-unit/src/lib.rs +++ b/exe-unit/src/lib.rs @@ -487,12 +487,12 @@ impl ExeUnitContext { } } -impl Into for &ExeUnitContext { - fn into(self) -> TransferServiceContext { +impl From<&ExeUnitContext> for TransferServiceContext { + fn from(val: &ExeUnitContext) -> Self { TransferServiceContext { - task_package: self.agreement.task_package.clone(), - cache_dir: self.cache_dir.clone(), - work_dir: self.work_dir.clone(), + task_package: val.agreement.task_package.clone(), + cache_dir: val.cache_dir.clone(), + work_dir: val.work_dir.clone(), } } } diff --git a/exe-unit/src/message.rs b/exe-unit/src/message.rs index 5ec3b77de6..27be13d80b 100644 --- a/exe-unit/src/message.rs +++ b/exe-unit/src/message.rs @@ -225,8 +225,8 @@ pub enum ShutdownReason { Error(#[from] Error), } -impl Into for Shutdown { - fn into(self) -> ya_transfer::transfer::Shutdown { +impl From for ya_transfer::transfer::Shutdown { + fn from(_: Shutdown) -> Self { ya_transfer::transfer::Shutdown {} } }