Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
Dr-Emann committed Feb 19, 2024
1 parent 712a333 commit d355770
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions crates/applesauce/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};

use applesauce_core::{compressor, num_blocks, BLOCK_SIZE};
use tokio::task::spawn_blocking;
use tracing::{info_span, Span};
use tracing::{info_span, Instrument, Span};

struct ReadAtReader<'a> {
file: &'a std::fs::File,
Expand Down Expand Up @@ -150,7 +150,7 @@ impl StreamCompressor {
.thread_name(|i| format!("stream-compressor-worker-{i}"))
.build()
.unwrap();
let target_blocks = dbg!(pool.current_num_threads()) * 4;
let target_blocks = dbg!(pool.current_num_threads()) * 2;
let sem = Arc::new(Semaphore::new(target_blocks));
Self {
pool,
Expand All @@ -176,7 +176,7 @@ impl StreamCompressor {
async move {
let permit = self.get_permit().await?;
spawn_blocking(move || -> io::Result<InputBlock> {
let _span = info_span!(parent: parent_span, "reading block", i).entered();
let _span = info_span!(parent: &parent_span, "reading block", i).entered();
use std::io::prelude::*;

let mut data = Vec::with_capacity(BLOCK_SIZE);
Expand Down Expand Up @@ -207,21 +207,20 @@ impl StreamCompressor {
.await
}

#[tracing::instrument(skip(self, path, outstanding_blocks))]
#[tracing::instrument(skip(self, metadata, outstanding_blocks))]
async fn _compress_file(
&self,
path: Arc<Path>,
metadata: Metadata,
outstanding_blocks: OutstandingBlocks,
mut outstanding_blocks: OutstandingBlocks,
) -> io::Result<()> {
let file = File::open(&path).await?;
let (tx, rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel(64);

let write_handle = async {
tokio::spawn(write_file(
tokio_stream::wrappers::ReceiverStream::new(rx),
path,
outstanding_blocks,
))
.await
.expect("write_file task panicked")
Expand All @@ -231,14 +230,17 @@ impl StreamCompressor {
let stream = self.compress_stream(stream);
let forward_task = async {
let mut stream = pin!(stream);
while let Some(item) = stream.try_next().await? {
if tx.send(item).await.is_err() {
while let Some(item) = stream.try_next().instrument(info_span!("try_next")).await? {
if tx.send(item).instrument(info_span!("send")).await.is_err() {
return Err(io::ErrorKind::BrokenPipe.into());
}
outstanding_blocks.return_block();
}
drop(tx);
Ok(())
};
let forward_task = forward_task.instrument(info_span!("forward_task"));
let write_handle = write_handle.instrument(info_span!("write_handle"));

((), ()) = tokio::try_join!(forward_task, write_handle)?;

Expand All @@ -260,9 +262,11 @@ impl StreamCompressor {
let (tx, rx) = tokio::sync::oneshot::channel();
let parent_span = Span::current();
self.pool.spawn_fifo(move || {
let block = block;
// Move into thread
let _span = tracing::info_span!(parent: parent_span, "compress_block", i = block.index)
.entered();
let _span =
tracing::info_span!(parent: &parent_span, "compress_block", i = block.index)
.entered();
let mut result = Vec::with_capacity(block.data.len() + 1024);
let res = with_compressor(compressor_kind, |compressor| {
compressor.compress(
Expand Down Expand Up @@ -295,18 +299,13 @@ impl StreamCompressor {
}
}

#[tracing::instrument(skip(stream, path, outstanding_blocks))]
async fn write_file(
stream: impl Stream<Item = Vec<u8>>,
path: Arc<Path>,
mut outstanding_blocks: OutstandingBlocks,
) -> io::Result<()> {
#[tracing::instrument(skip(stream, path))]
async fn write_file(stream: impl Stream<Item = Vec<u8>>, path: Arc<Path>) -> io::Result<()> {
let mut tmp_file = tmp_file_for(Arc::clone(&path)).await?;
let mut stream = pin!(stream);

while let Some(block) = stream.next().await {
tmp_file.as_file_mut().write_all(&block).await?;
outstanding_blocks.return_block();
}

spawn_blocking(move || tmp_file.persist(path.with_extension("cmp")))
Expand Down

0 comments on commit d355770

Please sign in to comment.