Skip to content

Commit

Permalink
Remove foreign chunks, skip uploading old chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
adamspofford-dfinity committed Jan 17, 2024
1 parent 9a42cfd commit 3576b4a
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions ic-utils/src/interfaces/management_canister/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ use crate::{
use async_trait::async_trait;
use candid::utils::ArgumentEncoder;
use candid::{CandidType, Deserialize, Nat};
use futures_util::future::BoxFuture;
use futures_util::{
future::ready,
stream::{self, FuturesUnordered},
FutureExt, Stream, StreamExt, TryStreamExt,
};
use ic_agent::{export::Principal, AgentError, RequestId};
use sha2::{Digest, Sha256};
use std::collections::{BTreeMap, BTreeSet};
use std::convert::{From, TryInto};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::{Arc, Mutex};

/// The set of possible canister settings. Similar to [`DefiniteCanisterSettings`](super::DefiniteCanisterSettings),
/// but all the fields are optional.
Expand Down Expand Up @@ -713,17 +714,18 @@ impl<'agent: 'canister, 'canister: 'builder, 'builder> InstallBuilder<'agent, 'c
/// use [`call_and_wait_with_progress`](Self::call_and_wait_with_progress) if you want progress reporting.
pub async fn call_and_wait(self) -> Result<(), AgentError> {
self.call_and_wait_with_progress()
.await
.try_for_each(|_| ready(Ok(())))
.await
}

/// Invoke the installation process. The returned stream must be awaited to completion; it is used to track progress,
/// Invoke the installation process. The returned stream must be iterated to completion; it is used to track progress,
/// as installation may take arbitrarily long, and is intended to be passed to functions like `indicatif::ProgressBar::wrap_stream`.
/// There are exactly [`size_hint().0`](Stream::size_hint) steps.
pub fn call_and_wait_with_progress(
pub async fn call_and_wait_with_progress(
self,
) -> impl Stream<Item = Result<(), AgentError>> + 'builder {
let stream_res = /* try { */ (move || {
) -> impl Stream<Item = Result<(), AgentError>> + Send + 'builder {
let stream_res = /* try { */ async move {
let arg = self.arg.serialize()?;
let stream: BoxStream<'_, _> =
if self.wasm.len() + arg.len() < Self::CHUNK_CUTOFF {
Expand All @@ -739,29 +741,39 @@ impl<'agent: 'canister, 'canister: 'builder, 'builder> InstallBuilder<'agent, 'c
.into_stream(),
)
} else {
let chunks_iter = self.wasm.chunks(1024 * 1024);
let results = Arc::new(Mutex::new(vec![<_>::default(); chunks_iter.len()]));
let (existing_chunks,) = self.canister.stored_chunks(&self.canister_id).call_and_wait().await?;
let existing_chunks = existing_chunks.into_iter().collect::<BTreeSet<_>>();
let to_upload_chunks_ordered = self.wasm.chunks(1024 * 1024).map(|x| (<[u8; 32]>::from(Sha256::digest(x)), x)).collect::<Vec<_>>();
let to_upload_chunks = to_upload_chunks_ordered.iter().map(|&(k, v)| (k, v)).collect::<BTreeMap<_, _>>();
let (new_chunks, setup) = if existing_chunks.iter().all(|hash| to_upload_chunks.contains_key(hash)) {
(
to_upload_chunks.iter()
.filter_map(|(hash, value)| (!existing_chunks.contains(hash)).then_some((*hash, *value)))
.collect(),
Box::pin(ready(Ok(()))) as BoxFuture<'_, _>,
)
} else {
(to_upload_chunks.clone(), self.canister.clear_chunk_store(&self.canister_id).call_and_wait())
};
let chunks_stream = FuturesUnordered::new();
for (x, chunk) in chunks_iter.enumerate() {
let results = results.clone();
for &chunk in new_chunks.values() {
chunks_stream.push(async move {
let (res,) = self
let (_res,) = self
.canister
.upload_chunk(&self.canister_id, chunk)
.call_and_wait()
.await?;
results.lock().unwrap()[x] = res.hash;
Ok(())
})
}
Box::pin(
chunks_stream
setup.into_stream()
// emit the same number of elements each time for a consistent progress bar, even if some are already uploaded
.chain(stream::repeat_with(|| Ok(())).take(to_upload_chunks.len() - new_chunks.len()))
.chain(chunks_stream)
.chain(
async move {
let results = Arc::into_inner(results)
.expect("unexpected arc handle left standing")
.into_inner()
.unwrap();
let results = to_upload_chunks_ordered.iter().map(|&(hash, _)| hash).collect();
self.canister
.install_chunked_code(
&self.canister_id,
Expand All @@ -787,7 +799,7 @@ impl<'agent: 'canister, 'canister: 'builder, 'builder> InstallBuilder<'agent, 'c
)
};
Ok(stream)
})();
}.await;
match stream_res {
Ok(stream) => stream,
Err(err) => Box::pin(stream::once(async { Err(err) })),
Expand Down

0 comments on commit 3576b4a

Please sign in to comment.