Skip to content

Commit

Permalink
Reorganize sgx code in TransferService
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Dec 21, 2023
1 parent 0b792a9 commit b78b1c7
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions exe-unit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ compat-deployment = []
sgx = [
'graphene-sgx',
'openssl/vendored',
'reqwest/trust-dns',
'secp256k1/rand',
'ya-client-model/sgx',
'ya-core-model/sgx',
Expand Down Expand Up @@ -76,7 +75,6 @@ log = "0.4"
openssl = { version = "0.10", optional = true }
rand = "0.8.5"
regex = "1.5"
reqwest = { version = "0.11", optional = true }
secp256k1 = { version = "0.27.0", optional = true }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
2 changes: 2 additions & 0 deletions exe-unit/components/transfer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ log = "0.4"
percent-encoding = "2.1"
rand = "0.8"
regex = "1.3.4"
reqwest = { version = "0.11", optional = true }
serde = "1.0.104"
sha3 = "0.8.2"
tempdir = "0.3.7"
Expand All @@ -47,6 +48,7 @@ async-trait = "0.1.74"
sgx = [
'ya-client-model/sgx',
'ya-core-model/sgx',
'reqwest/trust-dns',
]
framework-test = []

Expand Down
1 change: 0 additions & 1 deletion exe-unit/components/transfer/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ impl Cache {
}

#[inline(always)]
#[cfg(not(feature = "sgx"))]
pub fn to_temp_path(&self, path: &CachePath) -> ProjectedPath {
ProjectedPath::local(self.tmp_dir.clone(), path.temp_path())
}
Expand Down
4 changes: 2 additions & 2 deletions exe-unit/components/transfer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where

log::debug!("Transferring from offset: {}", ctx.state.offset());

let stream = wrap_stream(src.source(&src_url.url, ctx), src_url)?;
let stream = with_hash_stream(src.source(&src_url.url, ctx), src_url)?;
let sink = dst.destination(&dst_url.url, ctx);

transfer(stream, sink).await?;
Expand All @@ -92,7 +92,7 @@ where
}
}

fn wrap_stream(
fn with_hash_stream(
stream: TransferStream<TransferData, Error>,
url: &TransferUrl,
) -> Result<Box<dyn Stream<Item = Result<TransferData, Error>> + Unpin>, Error> {
Expand Down
160 changes: 88 additions & 72 deletions exe-unit/components/transfer/src/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use actix::prelude::*;
use futures::future::Abortable;
use url::Url;

use crate::cache::Cache;
use crate::cache::{Cache, CachePath};
use crate::error::Error;
use crate::error::Error as TransferError;
use crate::{
Expand All @@ -21,6 +21,20 @@ use ya_utils_futures::abort::Abort;

pub type Result<T> = std::result::Result<T, Error>;

macro_rules! actor_try {
($expr:expr) => {
match $expr {
Ok(val) => val,
Err(err) => {
return ActorResponse::reply(Err(Error::from(err)));
}
}
};
($expr:expr,) => {
$crate::actor_try!($expr)
};
}

#[derive(Clone, Debug, Message)]
#[rtype(result = "Result<()>")]
pub struct TransferResource {
Expand Down Expand Up @@ -119,6 +133,77 @@ impl TransferService {
.ok_or_else(|| TransferError::UnsupportedSchemeError(scheme.to_owned()))?
.clone())
}

#[cfg(feature = "sgx")]
fn deploy_sgx(
&self,
src_url: TransferUrl,
_src_name: CachePath,
path: PathBuf,
) -> ActorResponse<Self, Result<Option<PathBuf>>> {
let fut = async move {
let resp = reqwest::get(src_url.url)
.await
.map_err(|e| Error::Other(e.to_string()))?;
let bytes = resp
.bytes()
.await
.map_err(|e| Error::Other(e.to_string()))?;
std::fs::write(&path, bytes)?;
Ok(Some(path))
};
ActorResponse::r#async(fut.into_actor(self))
}

#[allow(unused)]
fn deploy_no_sgx(
&self,
src_url: TransferUrl,
src_name: CachePath,
path: PathBuf,
) -> ActorResponse<Self, Result<Option<PathBuf>>> {
let path_tmp = self.cache.to_temp_path(&src_name).to_path_buf();

let src = actor_try!(self.provider(&src_url));
let dst: Rc<FileTransferProvider> = Default::default();
let dst_url = TransferUrl {
url: Url::from_file_path(&path_tmp).unwrap(),
hash: None,
};

let handles = self.abort_handles.clone();
let fut = async move {
if path.exists() {
log::info!("Deploying cached image: {:?}", path);
return Ok(Some(path));
}

let (abort, reg) = Abort::new_pair();
{
let ctx = Default::default();
let retry = transfer_with(src, &src_url, dst, &dst_url, &ctx);

let _guard = AbortHandleGuard::register(handles, abort);
Ok::<_, Error>(
Abortable::new(retry, reg)
.await
.map_err(TransferError::from)?
.map_err(|err| {
if let TransferError::InvalidHashError { .. } = err {
let _ = std::fs::remove_file(&path_tmp);
}
err
})?,
)
}?;

move_file(&path_tmp, &path).await?;
log::info!("Deployment from {:?} finished", src_url.url);

Ok(Some(path))
};
ActorResponse::r#async(fut.into_actor(self))
}
}

impl Actor for TransferService {
Expand All @@ -133,20 +218,6 @@ impl Actor for TransferService {
}
}

macro_rules! actor_try {
($expr:expr) => {
match $expr {
Ok(val) => val,
Err(err) => {
return ActorResponse::reply(Err(Error::from(err)));
}
}
};
($expr:expr,) => {
$crate::actor_try!($expr)
};
}

impl Handler<DeployImage> for TransferService {
type Result = ActorResponse<Self, Result<Option<PathBuf>>>;

Expand All @@ -164,65 +235,10 @@ impl Handler<DeployImage> for TransferService {
log::info!("Deploying from {:?} to {:?}", src_url.url, path);

#[cfg(not(feature = "sgx"))]
{
let path_tmp = self.cache.to_temp_path(&src_name).to_path_buf();

let src = actor_try!(self.provider(&src_url));
let dst: Rc<FileTransferProvider> = Default::default();
let dst_url = TransferUrl {
url: Url::from_file_path(&path_tmp).unwrap(),
hash: None,
};

let handles = self.abort_handles.clone();
let fut = async move {
if path.exists() {
log::info!("Deploying cached image: {:?}", path);
return Ok(Some(path));
}

let (abort, reg) = Abort::new_pair();
{
let ctx = Default::default();
let retry = transfer_with(src, &src_url, dst, &dst_url, &ctx);

let _guard = AbortHandleGuard::register(handles, abort);
Ok::<_, Error>(
Abortable::new(retry, reg)
.await
.map_err(TransferError::from)?
.map_err(|err| {
if let TransferError::InvalidHashError { .. } = err {
let _ = std::fs::remove_file(&path_tmp);
}
err
})?,
)
}?;

move_file(&path_tmp, &path).await?;
log::info!("Deployment from {:?} finished", src_url.url);

Ok(Some(path))
};
ActorResponse::r#async(fut.into_actor(self))
}
return self.deploy_no_sgx(src_url, src_name, path);

#[cfg(feature = "sgx")]
{
let fut = async move {
let resp = reqwest::get(src_url.url)
.await
.map_err(|e| Error::Other(e.to_string()))?;
let bytes = resp
.bytes()
.await
.map_err(|e| Error::Other(e.to_string()))?;
std::fs::write(&path, bytes)?;
Ok(Some(path))
};
ActorResponse::r#async(fut.into_actor(self))
}
return self.deploy_sgx(src_url, src_name, path);
}
}

Expand Down
10 changes: 5 additions & 5 deletions exe-unit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,12 @@ impl ExeUnitContext {
}
}

impl Into<TransferServiceContext> for &ExeUnitContext {
fn into(self) -> TransferServiceContext {
impl From<&ExeUnitContext> for TransferServiceContext {
fn from(val: &ExeUnitContext) -> Self {
TransferServiceContext {
task_package: self.agreement.task_package.clone(),
cache_dir: self.cache_dir.clone(),
work_dir: self.work_dir.clone(),
task_package: val.agreement.task_package.clone(),
cache_dir: val.cache_dir.clone(),
work_dir: val.work_dir.clone(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions exe-unit/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ pub enum ShutdownReason {
Error(#[from] Error),
}

impl Into<ya_transfer::transfer::Shutdown> for Shutdown {
fn into(self) -> ya_transfer::transfer::Shutdown {
impl From<Shutdown> for ya_transfer::transfer::Shutdown {
fn from(_: Shutdown) -> Self {
ya_transfer::transfer::Shutdown {}
}
}

0 comments on commit b78b1c7

Please sign in to comment.