Skip to content

Commit

Permalink
Some changes improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Dr-Emann committed Feb 19, 2024
1 parent 78db09d commit 712a333
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 43 deletions.
66 changes: 28 additions & 38 deletions crates/applesauce/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};

use applesauce_core::{compressor, num_blocks};
use applesauce_core::{compressor, num_blocks, BLOCK_SIZE};
use tokio::task::spawn_blocking;
use tracing::{info_span, Span};

Expand All @@ -31,6 +31,7 @@ impl<'a> io::Read for ReadAtReader<'a> {
}
}

#[derive(Debug)]
struct BlockLimiter {
max_blocks: u64,
current_blocks: AtomicU64,
Expand Down Expand Up @@ -100,6 +101,7 @@ impl BlockLimiter {
}
}

#[derive(Debug)]
struct OutstandingBlocks {
block_limiter: Arc<BlockLimiter>,
count: u64,
Expand All @@ -115,6 +117,7 @@ impl OutstandingBlocks {
}
}

#[derive(Debug)]
struct InputBlock {
index: u64,
data: Vec<u8>,
Expand All @@ -128,19 +131,26 @@ impl Drop for OutstandingBlocks {
}
}

struct StreamCompressor {
#[derive(Debug)]
pub struct StreamCompressor {
pool: rayon::ThreadPool,
block_limit: Arc<BlockLimiter>,
sem: Arc<Semaphore>,
}

impl Default for StreamCompressor {
fn default() -> Self {
Self::new()
}
}

impl StreamCompressor {
fn new() -> Self {
pub fn new() -> Self {
let pool = rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("stream-compressor-worker-{i}"))
.build()
.unwrap();
let target_blocks = pool.current_num_threads() * 2;
let target_blocks = dbg!(pool.current_num_threads()) * 4;
let sem = Arc::new(Semaphore::new(target_blocks));
Self {
pool,
Expand All @@ -149,30 +159,32 @@ impl StreamCompressor {
}
}

pub fn chunked_stream(
fn chunked_stream(
&self,
file: std::fs::File,
chunk_size: u64,
metadata: Metadata,
) -> impl Stream<Item = io::Result<InputBlock>> + '_ {
// Read chunks with read_at
let file = Arc::new(file);
stream::iter(0..)
let block_count = num_blocks(metadata.len());
// TODO: Error if we get a different block count (or len?)
stream::iter(0..block_count + 1)
.map(move |i| {
let file = Arc::clone(&file);
let offset = i * chunk_size;
let offset = i * BLOCK_SIZE as u64;
let parent_span = Span::current();
async move {
let permit = self.get_permit().await?;
spawn_blocking(move || -> io::Result<InputBlock> {
let _span = info_span!(parent: parent_span, "reading block", i).entered();
use std::io::prelude::*;

let mut data = Vec::with_capacity(chunk_size.try_into().unwrap());
let mut data = Vec::with_capacity(BLOCK_SIZE);
let reader = ReadAtReader {
file: &file,
offset,
};
reader.take(chunk_size).read_to_end(&mut data)?;
reader.take(BLOCK_SIZE as u64).read_to_end(&mut data)?;
Ok(InputBlock {
index: i,
data,
Expand All @@ -187,25 +199,21 @@ impl StreamCompressor {
.try_take_while(|block| future::ready(Ok(!block.data.is_empty())))
}

async fn compress_file(
&self,
path: PathBuf,
metadata: Metadata,
) -> impl Future<Output = io::Result<()>> + '_ {
pub async fn compress_file(&self, path: Arc<Path>, metadata: Metadata) -> io::Result<()> {
let blocks = num_blocks(metadata.len());
let outstanding_blocks = self.block_limit.acquire(blocks).await;

self._compress_file(path, metadata, outstanding_blocks)
.await
}

#[tracing::instrument(skip(self, path, outstanding_blocks))]
async fn _compress_file(
&self,
path: PathBuf,
path: Arc<Path>,
metadata: Metadata,
outstanding_blocks: OutstandingBlocks,
) -> io::Result<()> {
let path: Arc<Path> = Arc::from(path);
let file = File::open(&path).await?;
let (tx, rx) = tokio::sync::mpsc::channel(1);

Expand All @@ -219,7 +227,7 @@ impl StreamCompressor {
.expect("write_file task panicked")
};

let stream = self.chunked_stream(file.into_std().await, applesauce_core::BLOCK_SIZE as u64);
let stream = self.chunked_stream(file.into_std().await, metadata);
let stream = self.compress_stream(stream);
let forward_task = async {
let mut stream = pin!(stream);
Expand Down Expand Up @@ -251,7 +259,7 @@ impl StreamCompressor {
) -> io::Result<Vec<u8>> {
let (tx, rx) = tokio::sync::oneshot::channel();
let parent_span = Span::current();
self.pool.spawn(move || {
self.pool.spawn_fifo(move || {
// Move into thread
let _span = tracing::info_span!(parent: parent_span, "compress_block", i = block.index)
.entered();
Expand All @@ -278,7 +286,7 @@ impl StreamCompressor {
rx.await.expect("compressor thread panicked")
}

pub fn compress_stream<'a>(
fn compress_stream<'a>(
&'a self,
s: impl Stream<Item = io::Result<InputBlock>> + 'a,
) -> impl Stream<Item = io::Result<Vec<u8>>> + 'a {
Expand Down Expand Up @@ -328,24 +336,6 @@ where
})
}

pub async fn compress_file(path: PathBuf, metadata: Metadata) -> io::Result<()> {
let compressor = StreamCompressor::new();
info_span!("setting up pool").in_scope(|| {
compressor.pool.broadcast(|_| {
COMPRESSORS.with(|compressors| {
let mut compressors = compressors.borrow_mut();
let idx = compressor::Kind::Lzfse as usize;
if idx >= compressors.len() {
compressors.resize_with(idx + 1, || None);
}
let _ = compressors[idx]
.get_or_insert_with(|| compressor::Kind::Lzfse.compressor().unwrap());
});
});
});
compressor.compress_file(path, metadata).flatten().await
}

#[tracing::instrument(level = "debug")]
async fn tmp_file_for(path: Arc<Path>) -> io::Result<NamedTempFile<File>> {
let mut builder = tempfile::Builder::new();
Expand Down
23 changes: 18 additions & 5 deletions crates/applesauce/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use applesauce::block_stream;
use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::fs::File;
use std::io;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::Arc;
use std::time::Instant;
use tokio::select;
use tracing_chrome::ChromeLayerBuilder;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
Expand Down Expand Up @@ -41,10 +48,16 @@ async fn main() {
.init();

let start = Instant::now();
let path = PathBuf::from("/tmp/dir/zeros");
let metadata = tokio::fs::metadata(&path).await.unwrap();
applesauce::block_stream::compress_file(path, metadata)
.await
.unwrap();
let compressor = Arc::new(block_stream::StreamCompressor::new());
let mut handles = Vec::new();
for path in &["/tmp/dir/zeros", "/tmp/dir/zeros2"] {
let path = Arc::from(Path::new(path));
let metadata = tokio::fs::metadata(&path).await.unwrap();
let compressor = Arc::clone(&compressor);
handles.push(tokio::spawn(async move {
compressor.compress_file(path, metadata).await
}));
}
let results = try_join_all(handles).await.unwrap();
dbg!(start.elapsed());
}

0 comments on commit 712a333

Please sign in to comment.