From d355770949d7efc2e15dc48892c8275f79e437d5 Mon Sep 17 00:00:00 2001 From: Zachary Dremann Date: Sun, 18 Feb 2024 21:49:32 -0500 Subject: [PATCH] More --- crates/applesauce/src/block_stream.rs | 35 +++++++++++++-------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/crates/applesauce/src/block_stream.rs b/crates/applesauce/src/block_stream.rs index 34a786e..7f19d8c 100644 --- a/crates/applesauce/src/block_stream.rs +++ b/crates/applesauce/src/block_stream.rs @@ -14,7 +14,7 @@ use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use applesauce_core::{compressor, num_blocks, BLOCK_SIZE}; use tokio::task::spawn_blocking; -use tracing::{info_span, Span}; +use tracing::{info_span, Instrument, Span}; struct ReadAtReader<'a> { file: &'a std::fs::File, @@ -150,7 +150,7 @@ impl StreamCompressor { .thread_name(|i| format!("stream-compressor-worker-{i}")) .build() .unwrap(); - let target_blocks = dbg!(pool.current_num_threads()) * 4; + let target_blocks = dbg!(pool.current_num_threads()) * 2; let sem = Arc::new(Semaphore::new(target_blocks)); Self { pool, @@ -176,7 +176,7 @@ impl StreamCompressor { async move { let permit = self.get_permit().await?; spawn_blocking(move || -> io::Result { - let _span = info_span!(parent: parent_span, "reading block", i).entered(); + let _span = info_span!(parent: &parent_span, "reading block", i).entered(); use std::io::prelude::*; let mut data = Vec::with_capacity(BLOCK_SIZE); @@ -207,21 +207,20 @@ impl StreamCompressor { .await } - #[tracing::instrument(skip(self, path, outstanding_blocks))] + #[tracing::instrument(skip(self, metadata, outstanding_blocks))] async fn _compress_file( &self, path: Arc, metadata: Metadata, - outstanding_blocks: OutstandingBlocks, + mut outstanding_blocks: OutstandingBlocks, ) -> io::Result<()> { let file = File::open(&path).await?; - let (tx, rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(64); let write_handle = async { tokio::spawn(write_file( tokio_stream::wrappers::ReceiverStream::new(rx), path, - outstanding_blocks, )) .await .expect("write_file task panicked") @@ -231,14 +230,17 @@ impl StreamCompressor { let stream = self.compress_stream(stream); let forward_task = async { let mut stream = pin!(stream); - while let Some(item) = stream.try_next().await? { - if tx.send(item).await.is_err() { + while let Some(item) = stream.try_next().instrument(info_span!("try_next")).await? { + if tx.send(item).instrument(info_span!("send")).await.is_err() { return Err(io::ErrorKind::BrokenPipe.into()); } + outstanding_blocks.return_block(); } drop(tx); Ok(()) }; + let forward_task = forward_task.instrument(info_span!("forward_task")); + let write_handle = write_handle.instrument(info_span!("write_handle")); ((), ()) = tokio::try_join!(forward_task, write_handle)?; @@ -260,9 +262,11 @@ impl StreamCompressor { let (tx, rx) = tokio::sync::oneshot::channel(); let parent_span = Span::current(); self.pool.spawn_fifo(move || { + let block = block; // Move into thread - let _span = tracing::info_span!(parent: parent_span, "compress_block", i = block.index) - .entered(); + let _span = + tracing::info_span!(parent: &parent_span, "compress_block", i = block.index) + .entered(); let mut result = Vec::with_capacity(block.data.len() + 1024); let res = with_compressor(compressor_kind, |compressor| { compressor.compress( @@ -295,18 +299,13 @@ impl StreamCompressor { } } -#[tracing::instrument(skip(stream, path, outstanding_blocks))] -async fn write_file( - stream: impl Stream>, - path: Arc, - mut outstanding_blocks: OutstandingBlocks, -) -> io::Result<()> { +#[tracing::instrument(skip(stream, path))] +async fn write_file(stream: impl Stream>, path: Arc) -> io::Result<()> { let mut tmp_file = tmp_file_for(Arc::clone(&path)).await?; let mut stream = pin!(stream); while let Some(block) = stream.next().await { tmp_file.as_file_mut().write_all(&block).await?; - outstanding_blocks.return_block(); } spawn_blocking(move || tmp_file.persist(path.with_extension("cmp")))