Skip to content

Commit

Permalink
Fetch chunks using concurrent requests (#553)
Browse files Browse the repository at this point in the history
Only applies if the chunk is large enough. Same logic as with manifests.

This still doesn't apply to virtual chunks, only native. For no
reason, we can follow up with virtual chunks in a different PR.

This commit also fixes a bug in chunks with offset != 0 (wich cannot be
constructed in IC today)
  • Loading branch information
paraseba authored Jan 9, 2025
1 parent 0a17588 commit b850bdb
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 132 deletions.
7 changes: 7 additions & 0 deletions icechunk/proptest-regressions/storage/mod.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 3a74daab07603c4d5c3124cf9ad1dca500ecc16a2f9409ef4acde9eea5901314 # shrinks to offset = 1, size = 16440244, min_part_size = 0, max_parts = 1
8 changes: 4 additions & 4 deletions icechunk/src/asset_manager.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use bytes::Bytes;
use quick_cache::sync::Cache;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{ops::Range, sync::Arc};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_util::io::SyncIoBridge;

use crate::{
config::CachingConfig,
format::{
format_constants, manifest::Manifest, snapshot::Snapshot,
transaction_log::TransactionLog, ByteRange, ChunkId, IcechunkFormatError,
transaction_log::TransactionLog, ChunkId, ChunkOffset, IcechunkFormatError,
ManifestId, SnapshotId,
},
private,
Expand All @@ -34,7 +34,7 @@ pub struct AssetManager {
#[serde(skip)]
transactions_cache: Cache<SnapshotId, Arc<TransactionLog>>,
#[serde(skip)]
chunk_cache: Cache<(ChunkId, ByteRange), Bytes>,
chunk_cache: Cache<(ChunkId, Range<ChunkOffset>), Bytes>,
}

impl private::Sealed for AssetManager {}
Expand Down Expand Up @@ -245,7 +245,7 @@ impl AssetManager {
pub async fn fetch_chunk(
&self,
chunk_id: &ChunkId,
range: &ByteRange,
range: &Range<ChunkOffset>,
) -> RepositoryResult<Bytes> {
let key = (chunk_id.clone(), range.clone());
match self.chunk_cache.get_value_or_guard_async(&key).await {
Expand Down
16 changes: 9 additions & 7 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
collections::HashSet,
future::{ready, Future},
iter,
ops::Range,
pin::Pin,
sync::Arc,
};
Expand All @@ -28,8 +29,8 @@ use crate::{
SnapshotProperties, UserAttributesSnapshot, ZarrArrayMetadata,
},
transaction_log::TransactionLog,
ByteRange, ChunkIndices, IcechunkFormatError, ManifestId, NodeId, ObjectId, Path,
SnapshotId,
ByteRange, ChunkIndices, ChunkOffset, IcechunkFormatError, ManifestId, NodeId,
ObjectId, Path, SnapshotId,
},
metadata::UserAttributes,
refs::{fetch_branch_tip, update_branch, RefError},
Expand Down Expand Up @@ -458,9 +459,10 @@ impl Session {
) -> SessionResult<Option<Pin<Box<dyn Future<Output = SessionResult<Bytes>> + Send>>>>
{
match self.get_chunk_ref(path, coords).await? {
Some(ChunkPayload::Ref(ChunkRef { id, .. })) => {
Some(ChunkPayload::Ref(ChunkRef { id, offset, length })) => {
let byte_range = byte_range.clone();
let asset_manager = Arc::clone(&self.asset_manager);
let byte_range = construct_valid_byte_range(&byte_range, offset, length);
Ok(Some(
async move {
// TODO: we don't have a way to distinguish if we want to pass a range or not
Expand Down Expand Up @@ -1105,24 +1107,24 @@ pub fn construct_valid_byte_range(
request: &ByteRange,
chunk_offset: u64,
chunk_length: u64,
) -> ByteRange {
) -> Range<ChunkOffset> {
// TODO: error for offset<0
// TODO: error if request.start > offset + length
match request {
ByteRange::Bounded(std::ops::Range { start: req_start, end: req_end }) => {
let new_start =
min(chunk_offset + req_start, chunk_offset + chunk_length - 1);
let new_end = min(chunk_offset + req_end, chunk_offset + chunk_length);
ByteRange::Bounded(new_start..new_end)
new_start..new_end
}
ByteRange::From(n) => {
let new_start = min(chunk_offset + n, chunk_offset + chunk_length - 1);
ByteRange::Bounded(new_start..chunk_offset + chunk_length)
new_start..chunk_offset + chunk_length
}
ByteRange::Last(n) => {
let new_end = chunk_offset + chunk_length;
let new_start = new_end - n;
ByteRange::Bounded(new_start..new_end)
new_start..new_end
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions icechunk/src/storage/logging.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::{Arc, Mutex};
use std::{
ops::Range,
sync::{Arc, Mutex},
};

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -8,7 +11,7 @@ use tokio::io::AsyncRead;

use super::{ETag, ListInfo, Settings, Storage, StorageError, StorageResult};
use crate::{
format::{ByteRange, ChunkId, ManifestId, SnapshotId},
format::{ChunkId, ChunkOffset, ManifestId, SnapshotId},
private,
};

Expand Down Expand Up @@ -107,7 +110,7 @@ impl Storage for LoggingStorage {
&self,
settings: &Settings,
id: &ChunkId,
range: &ByteRange,
range: &Range<ChunkOffset>,
) -> Result<Bytes, StorageError> {
self.fetch_log
.lock()
Expand Down
65 changes: 36 additions & 29 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{
ffi::OsString,
iter,
num::{NonZeroU16, NonZeroU64},
ops::Range,
path::Path,
sync::Arc,
};
Expand All @@ -39,7 +40,7 @@ pub use object_store::ObjectStorage;

use crate::{
config::{GcsCredentials, GcsStaticCredentials, S3Credentials, S3Options},
format::{ByteRange, ChunkId, ManifestId, SnapshotId},
format::{ChunkId, ChunkOffset, ManifestId, SnapshotId},
private,
};

Expand Down Expand Up @@ -167,7 +168,7 @@ pub trait Storage: fmt::Debug + private::Sealed + Sync + Send {
&self,
settings: &Settings,
id: &ChunkId,
range: &ByteRange,
range: &Range<ChunkOffset>,
) -> StorageResult<Bytes>; // FIXME: format flags
async fn fetch_transaction_log(
&self,
Expand Down Expand Up @@ -322,25 +323,27 @@ where

/// Split an object request into multiple byte range requests
///
/// Returns tuples of (offset, size) for each request. It tries to generate the maximum number of
/// Returns tuples of Range for each request. It tries to generate the maximum number of
/// requests possible, not generating more than `max_parts` requests, and each request not being
/// smaller than `min_part_size`. Note that the size of the last request is >= the preceding one.
fn split_in_multiple_requests(
size: u64,
range: &Range<u64>,
min_part_size: u64,
max_parts: u16,
) -> impl Iterator<Item = (u64, u64)> {
) -> impl Iterator<Item = Range<u64>> {
let size = range.end - range.start;
let min_part_size = max(min_part_size, 1);
let num_parts = size / min_part_size;
let num_parts = max(1, min(num_parts, max_parts as u64));
let equal_parts = num_parts - 1;
let equal_parts_size = size / num_parts;
let last_part_size = size - equal_parts * equal_parts_size;
let equal_requests =
iter::successors(Some((0, equal_parts_size)), move |(off, _)| {
Some((off + equal_parts_size, equal_parts_size))
});
let last_request = iter::once((equal_parts * equal_parts_size, last_part_size));
let equal_requests = iter::successors(
Some(range.start..range.start + equal_parts_size),
move |range| Some(range.end..range.end + equal_parts_size),
);
let last_part_offset = range.start + equal_parts * equal_parts_size;
let last_request = iter::once(last_part_offset..last_part_offset + last_part_size);
equal_requests.take(equal_parts as usize).chain(last_request)
}

Expand Down Expand Up @@ -443,8 +446,8 @@ mod tests {
cases: 999, .. ProptestConfig::default()
})]
#[test]
fn test_split_requests(size in 1..3_000_000_000u64, min_part_size in 0..16_000_000u64, max_parts in 1..100u16 ) {
let res: Vec<_> = split_in_multiple_requests(size, min_part_size, max_parts).collect();
fn test_split_requests(offset in 0..1_000_000u64, size in 1..3_000_000_000u64, min_part_size in 1..16_000_000u64, max_parts in 1..100u16 ) {
let res: Vec<_> = split_in_multiple_requests(&(offset..offset+size), min_part_size, max_parts).collect();

// there is always at least 1 request
prop_assert!(!res.is_empty());
Expand All @@ -456,19 +459,19 @@ mod tests {
prop_assert!(res.len() <= max_parts as usize);

// the request sizes add up to total size
prop_assert_eq!(res.iter().map(|(_, size)| size).sum::<u64>(), size);
prop_assert_eq!(res.iter().map(|range| range.end - range.start).sum::<u64>(), size);

// no more than one request smaller than minimum size
prop_assert!(res.iter().filter(|(_, size)| *size < min_part_size).count() <= 1);
prop_assert!(res.iter().filter(|range| (range.end - range.start) < min_part_size).count() <= 1);

// if there is a request smaller than the minimum size it is because the total size is
// smaller than minimum request size
if res.iter().any(|(_, size)| *size < min_part_size) {
if res.iter().any(|range| (range.end - range.start) < min_part_size) {
prop_assert!(size < min_part_size)
}

// there are only two request sizes
let counts = res.iter().map(|(_, size)| size).counts();
let counts = res.iter().map(|range| (range.end - range.start)).counts();
prop_assert!(counts.len() <= 2); // only last element is smaller
if counts.len() > 1 {
// the smaller request size happens only once
Expand All @@ -478,36 +481,40 @@ mod tests {
// there are no holes in the requests, nor bytes that are requested more than once
let mut iter = res.iter();
iter.next();
prop_assert!(res.iter().zip(iter).all(|((off1,size),(off2,_))| off1 + size == *off2));
prop_assert!(res.iter().zip(iter).all(|(r1,r2)| r1.end == r2.start));
}

}

#[test]
fn test_split_examples() {
assert_eq!(
split_in_multiple_requests(4, 4, 100,).collect::<Vec<_>>(),
vec![(0, 4)]
split_in_multiple_requests(&(0..4), 4, 100,).collect::<Vec<_>>(),
vec![0..4]
);
assert_eq!(
split_in_multiple_requests(&(10..14), 4, 100,).collect::<Vec<_>>(),
vec![10..14]
);
assert_eq!(
split_in_multiple_requests(3, 1, 100,).collect::<Vec<_>>(),
vec![(0, 1), (1, 1), (2, 1),]
split_in_multiple_requests(&(20..23), 1, 100,).collect::<Vec<_>>(),
vec![(20..21), (21..22), (22..23),]
);
assert_eq!(
split_in_multiple_requests(6, 5, 100,).collect::<Vec<_>>(),
vec![(0, 6)]
split_in_multiple_requests(&(10..16), 5, 100,).collect::<Vec<_>>(),
vec![(10..16)]
);
assert_eq!(
split_in_multiple_requests(11, 5, 100,).collect::<Vec<_>>(),
vec![(0, 5), (5, 6)]
split_in_multiple_requests(&(10..21), 5, 100,).collect::<Vec<_>>(),
vec![(10..15), (15..21)]
);
assert_eq!(
split_in_multiple_requests(13, 5, 2,).collect::<Vec<_>>(),
vec![(0, 6), (6, 7)]
split_in_multiple_requests(&(0..13), 5, 2,).collect::<Vec<_>>(),
vec![(0..6), (6..13)]
);
assert_eq!(
split_in_multiple_requests(100, 5, 3,).collect::<Vec<_>>(),
vec![(0, 33), (33, 33), (66, 34)]
split_in_multiple_requests(&(0..100), 5, 3,).collect::<Vec<_>>(),
vec![(0..33), (33..66), (66..100)]
);
}
}
32 changes: 16 additions & 16 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{
format::{ByteRange, ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId},
format::{
ByteRange, ChunkId, ChunkOffset, FileTypeTag, ManifestId, ObjectId, SnapshotId,
},
private,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -224,18 +226,17 @@ impl ObjectStorage {
&self,
settings: &Settings,
path: &ObjectPath,
size: u64,
range: &Range<u64>,
) -> StorageResult<impl AsyncRead> {
let mut results = split_in_multiple_requests(
size,
range,
settings.concurrency.min_concurrent_request_size.get(),
settings.concurrency.max_concurrent_requests_for_object.get(),
)
.map(|(req_offset, req_size)| async move {
.map(|range| async move {
let store = Arc::clone(&self.store);
let range = Some(GetRange::from(
req_offset as usize..req_offset as usize + req_size as usize,
));
let usize_range = range.start as usize..range.end as usize;
let range = Some(usize_range.into());
let opts = GetOptions { range, ..Default::default() };
let path = path.clone();
store.get_opts(&path, opts).await
Expand Down Expand Up @@ -394,7 +395,7 @@ impl Storage for ObjectStorage {
size: u64,
) -> StorageResult<Box<dyn AsyncRead + Unpin + Send>> {
let path = self.get_manifest_path(id);
Ok(Box::new(self.get_object_concurrently(settings, &path, size).await?))
Ok(Box::new(self.get_object_concurrently(settings, &path, &(0..size)).await?))
}

async fn fetch_manifest_single_request(
Expand Down Expand Up @@ -462,17 +463,16 @@ impl Storage for ObjectStorage {

async fn fetch_chunk(
&self,
_settings: &Settings,
settings: &Settings,
id: &ChunkId,
range: &ByteRange,
range: &Range<ChunkOffset>,
) -> Result<Bytes, StorageError> {
let path = self.get_chunk_path(id);
// TODO: shall we split `range` into multiple ranges and use get_ranges?
// I can't tell that `get_range` does splitting
let options =
GetOptions { range: Option::<GetRange>::from(range), ..Default::default() };
let chunk = self.store.get_opts(&path, options).await?.bytes().await?;
Ok(chunk)
let mut read = self.get_object_concurrently(settings, &path, range).await?;
// add some extra space to the buffer to optimize conversion to bytes
let mut buffer = Vec::with_capacity((range.end - range.start + 16) as usize);
tokio::io::copy(&mut read, &mut buffer).await?;
Ok(buffer.into())
}

async fn write_chunk(
Expand Down
Loading

0 comments on commit b850bdb

Please sign in to comment.