Skip to content

Commit

Permalink
Improving pull example
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Sep 6, 2020
1 parent aec5ddc commit f98f1bc
Showing 1 changed file with 68 additions and 8 deletions.
76 changes: 68 additions & 8 deletions kamu-cli/examples/pull-root-multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ fn main() {
cmd.run().unwrap();
}

fn rand_hash() -> String {
use rand::distributions::Standard;
use rand::Rng;
use std::fmt::Write;

let mut res = String::with_capacity(64);
rand::thread_rng()
.sample_iter::<u8, _>(&Standard)
.take(32)
.for_each(|b| write!(&mut res, "{:02x}", b).unwrap());
res
}

pub struct TestPullService;

impl TestPullService {
Expand All @@ -42,9 +55,14 @@ impl TestPullService {
listener.on_stage_progress(IngestStage::CheckCache, 0, 0);

sleep_rand(200, 1500);
for i in (0..200000).step_by(1000) {
listener.on_stage_progress(IngestStage::Fetch, i, 200000);
sleep(5);
let size = 10u64.pow(6) + ((20 * 10u64.pow(6)) as f64 * rand::random::<f64>()) as u64;
let download_time = 0.1 + 2.0 * rand::random::<f64>();
let chunk_size = 1024usize;
let num_chunks = size / chunk_size as u64;
let chunk_sleep = (download_time / num_chunks as f64 * 1000f64) as u64;
for i in (0..size).step_by(chunk_size) {
listener.on_stage_progress(IngestStage::Fetch, i, size);
sleep(chunk_sleep);
}

listener.on_stage_progress(IngestStage::Prepare, 0, 0);
Expand All @@ -63,7 +81,7 @@ impl TestPullService {

sleep_rand(200, 1500);

let hash = "a7ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a";
let hash = rand_hash();
let result = IngestResult::Updated {
block_hash: hash.to_owned(),
has_more: false,
Expand All @@ -77,6 +95,29 @@ impl TestPullService {
}),
)
}

fn transform(
id: DatasetIDBuf,
l: Arc<Mutex<dyn TransformListener>>,
) -> (DatasetIDBuf, Result<PullResult, PullError>) {
let mut listener = l.lock().unwrap();

listener.begin();

std::thread::sleep(std::time::Duration::from_millis(2000));

let new_hash = rand_hash();
listener.success(&TransformResult::Updated {
block_hash: new_hash.clone(),
});

(
id,
Ok(PullResult::Updated {
block_hash: new_hash,
}),
)
}
}

impl PullService for TestPullService {
Expand All @@ -86,13 +127,15 @@ impl PullService for TestPullService {
_recursive: bool,
_all: bool,
ingest_listener: Option<Arc<Mutex<dyn IngestMultiListener>>>,
_transform_listener: Option<Arc<Mutex<dyn TransformMultiListener>>>,
transform_listener: Option<Arc<Mutex<dyn TransformMultiListener>>>,
) -> Vec<(DatasetIDBuf, Result<PullResult, PullError>)> {
let in_l = ingest_listener.unwrap();
let handles: Vec<_> = [
let tr_l = transform_listener.unwrap();

let ingest_handles: Vec<_> = [
"org.geonames.cities",
"com.naturalearthdata.admin0",
"ca.statcan.census",
"gov.census.data",
]
.iter()
.map(|s| DatasetIDBuf::try_from(*s).unwrap())
Expand All @@ -102,7 +145,24 @@ impl PullService for TestPullService {
})
.collect();

handles.into_iter().map(|h| h.join().unwrap()).collect()
let ingest_results: Vec<_> = ingest_handles
.into_iter()
.map(|h| h.join().unwrap())
.collect();

let transform_handles: Vec<_> =
["com.acme.census.normalized", "com.acme.census.geolocated"]
.iter()
.map(|s| DatasetIDBuf::try_from(*s).unwrap())
.map(|id| {
let listener = tr_l.lock().unwrap().begin_transform(&id).unwrap();
std::thread::spawn(move || Self::transform(id, listener))
})
.collect();

let mut results = ingest_results;
results.extend(transform_handles.into_iter().map(|h| h.join().unwrap()));
results
}

fn set_watermark(
Expand Down

0 comments on commit f98f1bc

Please sign in to comment.