diff --git a/CHANGELOG.md b/CHANGELOG.md index cbdbea8..6cafd59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,7 +44,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 This change also adds `&self` to the `index()` function's signature, allowing the embedded indexer to have a configuration/state. - +- `CompareSwap`'s function now takes an additional parameter: + `Option<&Root::Index>`. This parameter allows inspecting the currently stored + index during the swap function. +- `modify()` now returns a `Vec>`, which + contains a list of keys that were modified and their new indexes (if the key + still exists after the modification). Note that for a Versioned tree, + modifications will always produce new indexes as removed keys are still + stored. +- `set()` now returns `Root::Index`, which is the newly stored index for the + key. +- `replace()` now returns a `(Option, Root::Index)>`, which is the + previously stored value and the new index for this key. +- `remove()` now returns both the key and index. + ### Fixed - When using `Roots::delete_tree()` on a tree that had previously been opened, @@ -72,6 +85,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `PersistenceMode::Flush` will only ensure all application-level caches are flushed before confirming the write is successful. +### Added + +- `TreeFile`, `Tree`, and `TransactionTree` now have additional methods that + allow fetching indexes or values and indexes: + + - `get_index()` - single-key index retrieval + - `get_with_index()` - single-key value + index retrieval + - `get_multiple_indexes()` multi-key index retrieval + - `get_multiple_with_indexes()` multi-key value + index retrieval + - `get_range_indexes()` ranged index retrieval + - `get_range_with_indexes()` ranged value + index retrieval + ## v0.5.3 ### Fixed diff --git a/fuzz/fuzz_targets/compare_swap.rs b/fuzz/fuzz_targets/compare_swap.rs index fbcab46..1ddd19b 100644 --- a/fuzz/fuzz_targets/compare_swap.rs +++ b/fuzz/fuzz_targets/compare_swap.rs @@ -31,13 +31,15 @@ fuzz_target!(|batches: Vec>| { .iter() .map(|key| ArcBytes::from(key.to_be_bytes())) .collect(), - operation: Operation::CompareSwap(CompareSwap::new(&mut |key, current_value| { - if current_value.is_some() { - KeyOperation::Remove - } else { - KeyOperation::Set(key.to_owned()) - } - })), + operation: Operation::CompareSwap(CompareSwap::new( + &mut |key, _index, current_value| { + if current_value.is_some() { + KeyOperation::Remove + } else { + KeyOperation::Set(key.to_owned()) + } + }, + )), }) .unwrap(); for key in batch { diff --git a/nebari/src/roots.rs b/nebari/src/roots.rs index 25c88c5..2459c3b 100644 --- a/nebari/src/roots.rs +++ b/nebari/src/roots.rs @@ -26,8 +26,9 @@ use crate::{ self, root::{AnyReducer, AnyTreeRoot}, state::AnyTreeState, - EmbeddedIndex, KeySequence, Modification, Operation, PersistenceMode, ScanEvaluation, - SequenceId, State, TransactableCompaction, TreeFile, TreeRoot, VersionedTreeRoot, + EmbeddedIndex, KeySequence, Modification, ModificationResult, Operation, PersistenceMode, + ScanEvaluation, SequenceId, State, TransactableCompaction, TreeFile, TreeRoot, + VersionedTreeRoot, }, vault::AnyVault, ArcBytes, ChunkCache, ErrorKind, @@ -430,21 +431,25 @@ where } impl TransactionTree { - /// Sets `key` to `value`. + /// Sets `key` to `value`. Returns the newly created index for this key. pub fn set( &mut self, key: impl Into>, value: impl Into>, - ) -> Result<(), Error> { - self.modify(vec![key.into()], Operation::Set(value.into())) + ) -> Result { + self.tree.set( + PersistenceMode::Transactional(self.transaction_id), + key, + value, + ) } - /// Executes a modification. + /// Executes a modification. Returns a list of all changed keys. pub fn modify<'a>( &mut self, keys: Vec>, - operation: Operation<'a, ArcBytes<'static>>, - ) -> Result<(), Error> { + operation: Operation<'a, ArcBytes<'static>, Root::Index>, + ) -> Result>, Error> { self.tree.modify(Modification { keys, persistence_mode: PersistenceMode::Transactional(self.transaction_id), @@ -452,12 +457,15 @@ impl TransactionTree { }) } - /// Sets `key` to `value`. If a value already exists, it will be returned. + /// Sets `key` to `value`. Returns a tuple containing two elements: + /// + /// - The previously stored value, if a value was already present. + /// - The new/updated index for this key. pub fn replace( &mut self, key: impl Into>, value: impl Into>, - ) -> Result>, Error> { + ) -> Result<(Option>, Root::Index), Error> { self.tree.replace(key, value, self.transaction_id) } @@ -467,8 +475,26 @@ impl TransactionTree { self.tree.get(key, true) } - /// Removes `key` and returns the existing value, if present. - pub fn remove(&mut self, key: &[u8]) -> Result>, Error> { + /// Returns the current index of `key`. This will return updated information + /// if it has been previously updated within this transaction. + pub fn get_index(&mut self, key: &[u8]) -> Result, Error> { + self.tree.get_index(key, true) + } + + /// Returns the current value and index of `key`. This will return updated + /// information if it has been previously updated within this transaction. + pub fn get_with_index( + &mut self, + key: &[u8], + ) -> Result, Root::Index)>, Error> { + self.tree.get_with_index(key, true) + } + + /// Removes `key` and returns the existing value amd index, if present. + pub fn remove( + &mut self, + key: &[u8], + ) -> Result, Root::Index)>, Error> { self.tree.remove(key, self.transaction_id) } @@ -498,6 +524,33 @@ impl TransactionTree { self.tree.get_multiple(keys, true) } + /// Retrieves the indexes of `keys`. If any keys are not found, they will be + /// omitted from the results. Keys are required to be pre-sorted. + pub fn get_multiple_indexes<'keys, KeysIntoIter, KeysIter>( + &mut self, + keys: KeysIntoIter, + ) -> Result, Root::Index)>, Error> + where + KeysIntoIter: IntoIterator, + KeysIter: Iterator + ExactSizeIterator, + { + self.tree.get_multiple_indexes(keys, true) + } + + /// Retrieves the values and indexes of `keys`. If any keys are not found, + /// they will be omitted from the results. Keys are required to be + /// pre-sorted. + pub fn get_multiple_with_indexes<'keys, KeysIntoIter, KeysIter>( + &mut self, + keys: KeysIntoIter, + ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + where + KeysIntoIter: IntoIterator, + KeysIter: Iterator + ExactSizeIterator, + { + self.tree.get_multiple_with_indexes(keys, true) + } + /// Retrieves all of the values of keys within `range`. pub fn get_range<'keys, KeyRangeBounds>( &mut self, @@ -509,6 +562,28 @@ impl TransactionTree { self.tree.get_range(range, true) } + /// Retrieves all of the indexes of keys within `range`. + pub fn get_range_indexes<'keys, KeyRangeBounds>( + &mut self, + range: &'keys KeyRangeBounds, + ) -> Result, Root::Index)>, Error> + where + KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, + { + self.tree.get_range_indexes(range, true) + } + + /// Retrieves all of the values and indexes of keys within `range`. + pub fn get_range_with_indexes<'keys, KeyRangeBounds>( + &mut self, + range: &'keys KeyRangeBounds, + ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + where + KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, + { + self.tree.get_range_with_indexes(range, true) + } + /// Scans the tree across all nodes that might contain nodes within `range`. /// /// If `forwards` is true, the tree is scanned in ascending order. @@ -814,38 +889,73 @@ impl Tree { }) } - /// Sets `key` to `value`. If a value already exists, it will be returned. + /// Retrieves the current index of `key`, if present. Does not reflect any + /// changes in pending transactions. + pub fn get_index(&self, key: &[u8]) -> Result, Error> { + catch_compaction_and_retry(|| { + let mut tree = match self.open_for_read() { + Ok(tree) => tree, + Err(err) if err.kind.is_file_not_found() => return Ok(None), + Err(err) => return Err(err), + }; + + tree.get_index(key, false) + }) + } + + /// Retrieves the current value and index of `key`, if present. Does not reflect any + /// changes in pending transactions. + pub fn get_with_index( + &self, + key: &[u8], + ) -> Result, Root::Index)>, Error> { + catch_compaction_and_retry(|| { + let mut tree = match self.open_for_read() { + Ok(tree) => tree, + Err(err) if err.kind.is_file_not_found() => return Ok(None), + Err(err) => return Err(err), + }; + + tree.get_with_index(key, false) + }) + } + + /// Sets `key` to `value`. Returns a tuple containing two elements: + /// + /// - The previously stored value, if a value was already present. + /// - The new/updated index for this key. #[allow(clippy::missing_panics_doc)] pub fn replace( &mut self, key: impl Into>, value: impl Into>, - ) -> Result>, Error> { + ) -> Result<(Option>, Root::Index), Error> { let transaction = self.begin_transaction()?; let existing_value = transaction.tree::(0).unwrap().replace(key, value)?; transaction.commit()?; Ok(existing_value) } - /// Executes a modification. + /// Executes a modification. Returns a list of all changed keys. #[allow(clippy::missing_panics_doc)] pub fn modify<'a>( &mut self, keys: Vec>, - operation: Operation<'a, ArcBytes<'static>>, - ) -> Result<(), Error> { + operation: Operation<'a, ArcBytes<'static>, Root::Index>, + ) -> Result>, Error> { let transaction = self.begin_transaction()?; - transaction + let results = transaction .tree::(0) .unwrap() .modify(keys, operation)?; transaction.commit()?; - Ok(()) + Ok(results) } - /// Removes `key` and returns the existing value, if present. This is executed within its own transaction. + /// Removes `key` and returns the existing value and index, if present. This + /// is executed within its own transaction. #[allow(clippy::missing_panics_doc)] - pub fn remove(&self, key: &[u8]) -> Result>, Error> { + pub fn remove(&self, key: &[u8]) -> Result, Root::Index)>, Error> { let transaction = self.begin_transaction()?; let existing_value = transaction.tree::(0).unwrap().remove(key)?; transaction.commit()?; @@ -892,6 +1002,51 @@ impl Tree { }) } + /// Retrieves the indexes of `keys`. If any keys are not found, they will be + /// omitted from the results. Keys are required to be pre-sorted. + #[allow(clippy::needless_pass_by_value)] + pub fn get_multiple_indexes<'keys, KeysIntoIter, KeysIter>( + &self, + keys: KeysIntoIter, + ) -> Result, Root::Index)>, Error> + where + KeysIntoIter: IntoIterator + Clone, + KeysIter: Iterator + ExactSizeIterator, + { + catch_compaction_and_retry(|| { + let mut tree = match self.open_for_read() { + Ok(tree) => tree, + Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()), + Err(err) => return Err(err), + }; + + tree.get_multiple_indexes(keys.clone(), false) + }) + } + + /// Retrieves the values and indexes of `keys`. If any keys are not found, + /// they will be omitted from the results. Keys are required to be + /// pre-sorted. + #[allow(clippy::needless_pass_by_value)] + pub fn get_multiple_with_indexes<'keys, KeysIntoIter, KeysIter>( + &self, + keys: KeysIntoIter, + ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + where + KeysIntoIter: IntoIterator + Clone, + KeysIter: Iterator + ExactSizeIterator, + { + catch_compaction_and_retry(|| { + let mut tree = match self.open_for_read() { + Ok(tree) => tree, + Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()), + Err(err) => return Err(err), + }; + + tree.get_multiple_with_indexes(keys.clone(), false) + }) + } + /// Retrieves all of the values of keys within `range`. pub fn get_range<'keys, KeyRangeBounds>( &self, @@ -911,6 +1066,44 @@ impl Tree { }) } + /// Retrieves all of the indexes of keys within `range`. + pub fn get_range_indexes<'keys, KeyRangeBounds>( + &self, + range: &'keys KeyRangeBounds, + ) -> Result, Root::Index)>, Error> + where + KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, + { + catch_compaction_and_retry(|| { + let mut tree = match self.open_for_read() { + Ok(tree) => tree, + Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()), + Err(err) => return Err(err), + }; + + tree.get_range_indexes(range, false) + }) + } + + /// Retrieves all of the values and indexes of keys within `range`. + pub fn get_range_with_indexes<'keys, KeyRangeBounds>( + &self, + range: &'keys KeyRangeBounds, + ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + where + KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, + { + catch_compaction_and_retry(|| { + let mut tree = match self.open_for_read() { + Ok(tree) => tree, + Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()), + Err(err) => return Err(err), + }; + + tree.get_range_with_indexes(range, false) + }) + } + /// Scans the tree across all nodes that might contain nodes within `range`. /// /// If `forwards` is true, the tree is scanned in ascending order. @@ -1527,7 +1720,7 @@ mod tests { let absolute_id = (worker * OPERATION_COUNT + relative_id) as u64; tree.set(absolute_id.to_be_bytes(), absolute_id.to_be_bytes()) .unwrap(); - let value = tree + let (value, _) = tree .remove(&absolute_id.to_be_bytes()) .unwrap() .ok_or_else(|| panic!("value not found: {:?}", absolute_id)) diff --git a/nebari/src/tree/btree_entry.rs b/nebari/src/tree/btree_entry.rs index 2a74f77..90f7e6b 100644 --- a/nebari/src/tree/btree_entry.rs +++ b/nebari/src/tree/btree_entry.rs @@ -123,7 +123,7 @@ pub struct ModificationContext< Loader, IndexReducer, > where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -131,7 +131,7 @@ pub struct ModificationContext< &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { pub current_order: usize, pub minimum_children: usize, @@ -164,8 +164,8 @@ where { pub(crate) fn modify( &mut self, - modification: &mut Modification<'_, IndexedType>, - context: &ModificationContext< + modification: &mut Modification<'_, IndexedType, Index>, + context: &mut ModificationContext< IndexedType, Index, ReducedIndex, @@ -179,7 +179,7 @@ where writer: &mut PagedWriter<'_>, ) -> Result where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -187,7 +187,7 @@ where &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { match &mut self.node { BTreeNode::Leaf(children) => { @@ -267,8 +267,8 @@ where #[allow(clippy::too_many_lines)] // TODO refactor, too many lines fn modify_leaf( children: &mut Vec>, - modification: &mut Modification<'_, IndexedType>, - context: &ModificationContext< + modification: &mut Modification<'_, IndexedType, Index>, + context: &mut ModificationContext< IndexedType, Index, ReducedIndex, @@ -282,7 +282,7 @@ where writer: &mut PagedWriter<'_>, ) -> Result where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -290,7 +290,7 @@ where &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { let mut last_index = 0; let mut any_changes = false; @@ -333,7 +333,7 @@ where Operation::CompareSwap(callback) => { let current_index = &children[last_index].index; let existing_value = (context.loader)(current_index, writer)?; - match callback(&key, existing_value) { + match callback(&key, Some(current_index), existing_value) { KeyOperation::Skip => KeyOperation::Skip, KeyOperation::Set(new_value) => (context.indexer)( &key, @@ -388,7 +388,7 @@ where // The key doesn't exist, so a remove is a no-op. KeyOperation::Remove } - Operation::CompareSwap(callback) => match callback(&key, None) { + Operation::CompareSwap(callback) => match callback(&key, None, None) { KeyOperation::Skip => KeyOperation::Skip, KeyOperation::Set(new_value) => { (context.indexer)(&key, Some(&new_value), None, changes, writer)? @@ -426,8 +426,8 @@ where fn modify_interior( children: &mut Vec>, - modification: &mut Modification<'_, IndexedType>, - context: &ModificationContext< + modification: &mut Modification<'_, IndexedType, Index>, + context: &mut ModificationContext< IndexedType, Index, ReducedIndex, @@ -441,7 +441,7 @@ where writer: &mut PagedWriter<'_>, ) -> Result where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -449,7 +449,7 @@ where &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { let mut last_index = 0; let mut any_changes = false; @@ -545,7 +545,7 @@ where writer: &mut PagedWriter<'_>, ) -> Result<(ChangeResult, bool), Error> where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -553,7 +553,7 @@ where &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { let can_absorb = children.len() > 1; match (result, can_absorb) { @@ -594,7 +594,7 @@ where writer: &mut PagedWriter<'_>, ) -> Result<(ChangeResult, bool), Error> where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -602,7 +602,7 @@ where &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { let (insert_on_top, sponge_index) = if child_index > 0 { (true, child_index - 1) @@ -668,7 +668,7 @@ where writer: &mut PagedWriter<'_>, ) -> Result<(ChangeResult, bool), Error> where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -676,7 +676,7 @@ where &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { let mut should_backup = false; // Before adding a new node, we want to first try to use neighboring @@ -747,7 +747,7 @@ where writer: &mut PagedWriter<'_>, ) -> Result<(ChangeResult, bool), Error> where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -755,7 +755,7 @@ where &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { let mut should_backup = false; // Check the previous child to see if it can accept any of this child. @@ -849,7 +849,7 @@ where writer: &mut PagedWriter<'_>, ) -> Result where - Indexer: Fn( + Indexer: FnMut( &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, @@ -857,7 +857,7 @@ where &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { // Check the previous child to see if it can accept any of this child. children[child_index + 1].position.load( diff --git a/nebari/src/tree/by_id.rs b/nebari/src/tree/by_id.rs index c21bf78..9f2cfb8 100644 --- a/nebari/src/tree/by_id.rs +++ b/nebari/src/tree/by_id.rs @@ -154,6 +154,8 @@ where } } +/// Indexes and Reduces [`VersionedByIdIndex`] and [`UnversionedByIdIndex`]. +/// Contains an [`EmbeddedIndex`][super::EmbeddedIndex]. #[derive(Clone, Default, Debug)] pub struct ByIdIndexer(pub EmbeddedIndexer); diff --git a/nebari/src/tree/mod.rs b/nebari/src/tree/mod.rs index 8e76209..4606cf8 100644 --- a/nebari/src/tree/mod.rs +++ b/nebari/src/tree/mod.rs @@ -91,7 +91,7 @@ pub(crate) const DEFAULT_MAX_ORDER: usize = 1000; pub use self::{ btree_entry::{BTreeEntry, BTreeNode, Indexer, KeyOperation, Reducer}, - by_id::{ByIdStats, UnversionedByIdIndex, VersionedByIdIndex}, + by_id::{ByIdIndexer, ByIdStats, UnversionedByIdIndex, VersionedByIdIndex}, by_sequence::{BySequenceIndex, BySequenceStats, SequenceId}, interior::{Interior, Pointer}, key_entry::{KeyEntry, ValueIndex}, @@ -312,30 +312,40 @@ impl TreeFile { /// Sets a key/value pair. Replaces any previous value if set. If you wish /// to retrieve the previously stored value, use /// [`replace()`](Self::replace) instead. + /// + /// Returns the new/updated index for this key. pub fn set( &mut self, persistence_mode: impl Into, key: impl Into>, value: impl Into>, - ) -> Result<(), Error> { - self.file.execute(TreeModifier { - state: &self.state, - vault: self.vault.as_deref(), - cache: self.cache.as_ref(), - modification: Some(Modification { - persistence_mode: persistence_mode.into(), - keys: vec![key.into()], - operation: Operation::Set(value.into()), - }), - scratch: &mut self.scratch, - }) - } - - /// Executes a modification. + ) -> Result { + Ok(self + .file + .execute(TreeModifier { + state: &self.state, + vault: self.vault.as_deref(), + cache: self.cache.as_ref(), + modification: Some(Modification { + persistence_mode: persistence_mode.into(), + keys: vec![key.into()], + operation: Operation::Set(value.into()), + }), + scratch: &mut self.scratch, + })? + .into_iter() + .next() + .expect("always produces a single result") + .index + .expect("modification always produces a new index")) + } + + /// Executes a modification. Returns a list of modified keys and their + /// updated indexes, if the keys are still present. pub fn modify( &mut self, - modification: Modification<'_, ArcBytes<'static>>, - ) -> Result<(), Error> { + modification: Modification<'_, ArcBytes<'static>, Root::Index>, + ) -> Result>, Error> { self.file.execute(TreeModifier { state: &self.state, vault: self.vault.as_deref(), @@ -360,6 +370,7 @@ impl TreeFile { persistence_mode: persistence_mode.into(), keys: vec![ArcBytes::from(key)], operation: Operation::CompareSwap(CompareSwap::new(&mut |_key, + _index, value: Option< ArcBytes<'_>, >| { @@ -379,51 +390,66 @@ impl TreeFile { result } - /// Removes `key` and returns the existing value, if present. + /// Removes `key` and returns the existing value and index, if present. pub fn remove( &mut self, key: &[u8], persistence_mode: impl Into, - ) -> Result>, Error> { + ) -> Result, Root::Index)>, Error> { let mut existing_value = None; self.modify(Modification { persistence_mode: persistence_mode.into(), keys: vec![ArcBytes::from(key)], - operation: Operation::CompareSwap(CompareSwap::new(&mut |_key, value| { - existing_value = value; - KeyOperation::Remove - })), + operation: Operation::CompareSwap(CompareSwap::new( + &mut |_key, index: Option<&Root::Index>, value| { + existing_value = if let (Some(index), Some(value)) = (index, value) { + Some((value, index.clone())) + } else { + None + }; + KeyOperation::Remove + }, + )), })?; Ok(existing_value) } - /// Sets `key` to `value`. If a value already exists, it will be returned. + /// Sets `key` to `value`. Returns a tuple containing two elements: + /// + /// - The previously stored value, if a value was already present. + /// - The new/updated index for this key. #[allow(clippy::missing_panics_doc)] pub fn replace( &mut self, key: impl Into>, value: impl Into>, persistence_mode: impl Into, - ) -> Result>, Error> { + ) -> Result<(Option>, Root::Index), Error> { let mut existing_value = None; let mut value = Some(value.into()); - self.modify(Modification { - persistence_mode: persistence_mode.into(), - keys: vec![key.into()], - operation: Operation::CompareSwap(CompareSwap::new(&mut |_, stored_value| { - existing_value = stored_value; - KeyOperation::Set(value.take().unwrap()) - })), - })?; + let result = self + .modify(Modification { + persistence_mode: persistence_mode.into(), + keys: vec![key.into()], + operation: Operation::CompareSwap(CompareSwap::new( + &mut |_, _index, stored_value| { + existing_value = stored_value; + KeyOperation::Set(value.take().unwrap()) + }, + )), + })? + .into_iter() + .next() + .unwrap(); - Ok(existing_value) + Ok((existing_value, result.index.unwrap())) } /// Gets the value stored for `key`. #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] - pub fn get<'k>( + pub fn get( &mut self, - key: &'k [u8], + key: &[u8], in_transaction: bool, ) -> Result>, Error> { let mut buffer = None; @@ -433,15 +459,67 @@ impl TreeFile { vault: self.vault.as_deref(), cache: self.cache.as_ref(), keys: KeyRange::new(std::iter::once(key)), - key_reader: |_key, value| { + key_reader: |_key, value, _index| { buffer = Some(value); Ok(()) }, - key_evaluator: |_| ScanEvaluation::ReadData, + key_evaluator: |_, _| ScanEvaluation::ReadData, })?; Ok(buffer) } + /// Gets the index stored for `key`. + #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] + pub fn get_index( + &mut self, + key: &[u8], + in_transaction: bool, + ) -> Result, Error> { + let mut found_index = None; + self.file.execute(TreeGetter { + from_transaction: in_transaction, + state: &self.state, + vault: self.vault.as_deref(), + cache: self.cache.as_ref(), + keys: KeyRange::new(std::iter::once(key)), + key_reader: |_, _, _| unreachable!(), + key_evaluator: |_key, index| { + found_index = Some(index.clone()); + ScanEvaluation::Skip + }, + })?; + Ok(found_index) + } + + /// Gets the value and index stored for `key`. + #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] + pub fn get_with_index( + &mut self, + key: &[u8], + in_transaction: bool, + ) -> Result, Root::Index)>, Error> { + let mut buffer = None; + let mut found_index = None; + self.file.execute(TreeGetter { + from_transaction: in_transaction, + state: &self.state, + vault: self.vault.as_deref(), + cache: self.cache.as_ref(), + keys: KeyRange::new(std::iter::once(key)), + key_reader: |_key, value, index| { + buffer = Some(value); + found_index = Some(index); + Ok(()) + }, + key_evaluator: |_, _| ScanEvaluation::ReadData, + })?; + if let (Some(buffer), Some(index)) = (buffer, found_index) { + Ok(Some((buffer, index))) + } else { + Ok(None) + } + } + /// Gets the values stored in `keys`. Does not error if a key is missing. /// Returns key/value pairs in an unspecified order. Keys are required to be /// pre-sorted. @@ -463,16 +541,76 @@ impl TreeFile { vault: self.vault.as_deref(), cache: self.cache.as_ref(), keys: KeyRange::new(keys), - key_reader: |key, value| { + key_reader: |key, value, _| { buffers.push((key, value)); Ok(()) }, - key_evaluator: |_| ScanEvaluation::ReadData, + key_evaluator: |_, _| ScanEvaluation::ReadData, })?; Ok(buffers) } - /// Retrieves all keys and values with keys that are contained by `range`. + /// Gets the indexes stored in `keys`. Does not error if a key is missing. + /// Returns key/value pairs in an unspecified order. Keys are required to be + /// pre-sorted. + #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, keys)))] + pub fn get_multiple_indexes<'keys, KeysIntoIter, KeysIter>( + &mut self, + keys: KeysIntoIter, + in_transaction: bool, + ) -> Result, Root::Index)>, Error> + where + KeysIntoIter: IntoIterator, + KeysIter: Iterator + ExactSizeIterator, + { + let keys = keys.into_iter(); + let mut buffers = Vec::with_capacity(keys.len()); + self.file.execute(TreeGetter { + from_transaction: in_transaction, + state: &self.state, + vault: self.vault.as_deref(), + cache: self.cache.as_ref(), + keys: KeyRange::new(keys), + key_reader: |key, _value, index| { + buffers.push((key, index)); + Ok(()) + }, + key_evaluator: |_, _| ScanEvaluation::ReadData, + })?; + Ok(buffers) + } + + /// Gets the values and indexes stored in `keys`. Does not error if a key is + /// missing. Returns key/value pairs in an unspecified order. Keys are + /// required to be pre-sorted. + #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, keys)))] + pub fn get_multiple_with_indexes<'keys, KeysIntoIter, KeysIter>( + &mut self, + keys: KeysIntoIter, + in_transaction: bool, + ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + where + KeysIntoIter: IntoIterator, + KeysIter: Iterator + ExactSizeIterator, + { + let keys = keys.into_iter(); + let mut buffers = Vec::with_capacity(keys.len()); + self.file.execute(TreeGetter { + from_transaction: in_transaction, + state: &self.state, + vault: self.vault.as_deref(), + cache: self.cache.as_ref(), + keys: KeyRange::new(keys), + key_reader: |key, value, index| { + buffers.push((key, value, index)); + Ok(()) + }, + key_evaluator: |_, _| ScanEvaluation::ReadData, + })?; + Ok(buffers) + } + + /// Retrieves all keys and values for keys that are contained by `range`. #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] pub fn get_range<'keys, KeyRangeBounds>( &mut self, @@ -497,6 +635,57 @@ impl TreeFile { Ok(results) } + /// Retrieves all keys and indexes for keys that are contained by `range`. + #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] + pub fn get_range_indexes<'keys, KeyRangeBounds>( + &mut self, + range: &'keys KeyRangeBounds, + in_transaction: bool, + ) -> Result, Root::Index)>, Error> + where + KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, + { + let mut results = Vec::new(); + self.scan( + range, + true, + in_transaction, + &mut |_, _, _| ScanEvaluation::ReadData, + &mut |key, index| { + results.push((key.clone(), index.clone())); + ScanEvaluation::Skip + }, + &mut |_key, _index, _value| unreachable!(), + )?; + Ok(results) + } + + /// Retrieves all keys and values and indexes for keys that are contained by + /// `range`. + #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] + pub fn get_range_with_indexes<'keys, KeyRangeBounds>( + &mut self, + range: &'keys KeyRangeBounds, + in_transaction: bool, + ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + where + KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, + { + let mut results = Vec::new(); + self.scan( + range, + true, + in_transaction, + &mut |_, _, _| ScanEvaluation::ReadData, + &mut |_, _| ScanEvaluation::ReadData, + &mut |key, index, value| { + results.push((key, value, index.clone())); + Ok(()) + }, + )?; + Ok(results) + } + /// Scans the tree across all nodes that might contain nodes within `range`. /// /// If `forwards` is true, the tree is scanned in ascending order. @@ -1039,15 +1228,19 @@ struct TreeModifier<'a, 'm, Root: root::Root> { state: &'a State, vault: Option<&'a dyn AnyVault>, cache: Option<&'a ChunkCache>, - modification: Option>>, + modification: Option, Root::Index>>, scratch: &'a mut Vec, } -impl<'a, 'm, Root> FileOp> for TreeModifier<'a, 'm, Root> +impl<'a, 'm, Root> FileOp>, Error>> + for TreeModifier<'a, 'm, Root> where Root: root::Root, { - fn execute(mut self, file: &mut dyn File) -> Result<(), Error> { + fn execute( + mut self, + file: &mut dyn File, + ) -> Result>, Error> { let mut active_state = self.state.lock(); if active_state.file_id != file.id() { return Err(Error::from(ErrorKind::TreeCompacted)); @@ -1067,7 +1260,7 @@ where let max_order = active_state.max_order; // Execute the modification - active_state + let results = active_state .root .modify(modification, &mut data_block, max_order)?; @@ -1089,7 +1282,7 @@ where active_state.publish(self.state); } - Ok(()) + Ok(results) } } @@ -1171,8 +1364,8 @@ struct TreeGetter< 'a, 'keys, Root: root::Root, - KeyEvaluator: FnMut(&ArcBytes<'static>) -> ScanEvaluation, - KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>) -> Result<(), Error>, + KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation, + KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Root::Index) -> Result<(), Error>, Keys: Iterator, > { from_transaction: bool, @@ -1187,8 +1380,8 @@ struct TreeGetter< impl<'a, 'keys, KeyEvaluator, KeyReader, Root, Keys> FileOp> for TreeGetter<'a, 'keys, Root, KeyEvaluator, KeyReader, Keys> where - KeyEvaluator: FnMut(&ArcBytes<'static>) -> ScanEvaluation, - KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>) -> Result<(), Error>, + KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation, + KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Root::Index) -> Result<(), Error>, Keys: Iterator, Root: root::Root, { @@ -1835,6 +2028,14 @@ impl Serializable for () { } } +/// A single key's modification result. +pub struct ModificationResult { + /// The key that was changed. + pub key: ArcBytes<'static>, + /// The updated index, if the key is still present. + pub index: Option, +} + #[cfg(test)] mod tests { use std::{ @@ -2477,7 +2678,7 @@ mod tests { ) .unwrap(); - let stored = tree.replace(b"test", b"third", None).unwrap().unwrap(); + let stored = tree.replace(b"test", b"third", None).unwrap().0.unwrap(); assert_eq!(stored, b"second"); tree.compare_and_swap(b"test", Some(b"third"), None, None) @@ -2704,19 +2905,21 @@ mod tests { tree.modify(Modification { persistence_mode: PersistenceMode::Sync, keys: key_operations.keys().cloned().collect(), - operation: Operation::CompareSwap(CompareSwap::new(&mut |key, existing_value| { - let should_remove = *key_operations.get(key).unwrap(); - if should_remove { - assert!( - existing_value.is_some(), - "key {key:?} had no existing value" - ); - KeyOperation::Remove - } else { - assert!(existing_value.is_none(), "key {key:?} already had a value"); - KeyOperation::Set(key.to_owned()) - } - })), + operation: Operation::CompareSwap(CompareSwap::new( + &mut |key, _index, existing_value| { + let should_remove = *key_operations.get(key).unwrap(); + if should_remove { + assert!( + existing_value.is_some(), + "key {key:?} had no existing value" + ); + KeyOperation::Remove + } else { + assert!(existing_value.is_none(), "key {key:?} already had a value"); + KeyOperation::Set(key.to_owned()) + } + }, + )), }) .unwrap(); } diff --git a/nebari/src/tree/modify.rs b/nebari/src/tree/modify.rs index 99b61e8..91c8264 100644 --- a/nebari/src/tree/modify.rs +++ b/nebari/src/tree/modify.rs @@ -8,16 +8,16 @@ use crate::{error::Error, transaction::TransactionId, ArcBytes, ErrorKind}; /// A tree modification. #[derive(Debug)] -pub struct Modification<'a, T> { +pub struct Modification<'a, T, Index> { /// The transaction ID to store with this change. pub persistence_mode: PersistenceMode, /// The keys to operate upon. pub keys: Vec>, /// The operation to perform on the keys. - pub operation: Operation<'a, T>, + pub operation: Operation<'a, T, Index>, } -impl<'a, T> Modification<'a, T> { +impl<'a, T, Index> Modification<'a, T, Index> { pub(crate) fn reverse(&mut self) -> Result<(), Error> { if self.keys.windows(2).all(|w| w[0] < w[1]) { self.keys.reverse(); @@ -84,7 +84,7 @@ impl From> for PersistenceMode { } /// An operation that is performed on a set of keys. -pub enum Operation<'a, T> { +pub enum Operation<'a, T, Index> { /// Sets all keys to the value. Set(T), /// Sets each key to the corresponding entry in this value. The number of @@ -94,10 +94,10 @@ pub enum Operation<'a, T> { Remove, /// Executes the `CompareSwap`. The original value (or `None` if not /// present) is the only argument. - CompareSwap(CompareSwap<'a, T>), + CompareSwap(CompareSwap<'a, T, Index>), } -impl<'a, T: Debug> Debug for Operation<'a, T> { +impl<'a, T: Debug, Index> Debug for Operation<'a, T, Index> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Set(arg0) => f.debug_tuple("Set").field(arg0).finish(), @@ -111,35 +111,36 @@ impl<'a, T: Debug> Debug for Operation<'a, T> { /// A function that is allowed to check the current value of a key and determine /// how to operate on it. The first parameter is the key, and the second /// parameter is the current value, if present. -pub type CompareSwapFn<'a, T> = dyn FnMut(&ArcBytes<'a>, Option) -> KeyOperation + 'a; +pub type CompareSwapFn<'a, T, Index> = + dyn FnMut(&ArcBytes<'a>, Option<&Index>, Option) -> KeyOperation + 'a; /// A wrapper for a [`CompareSwapFn`]. -pub struct CompareSwap<'a, T>(&'a mut CompareSwapFn<'a, T>); +pub struct CompareSwap<'a, T, Index>(&'a mut CompareSwapFn<'a, T, Index>); -impl<'a, T> CompareSwap<'a, T> { +impl<'a, T, Index> CompareSwap<'a, T, Index> { /// Returns a new wrapped callback. - pub fn new, Option) -> KeyOperation + 'a>( + pub fn new, Option<&Index>, Option) -> KeyOperation + 'a>( callback: &'a mut F, ) -> Self { Self(callback) } } -impl<'a, T> Debug for CompareSwap<'a, T> { +impl<'a, T, Index> Debug for CompareSwap<'a, T, Index> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("CompareSwap(dyn FnMut)") } } -impl<'a, T> Deref for CompareSwap<'a, T> { - type Target = CompareSwapFn<'a, T>; +impl<'a, T, Index> Deref for CompareSwap<'a, T, Index> { + type Target = CompareSwapFn<'a, T, Index>; fn deref(&self) -> &Self::Target { self.0 } } -impl<'a, T> DerefMut for CompareSwap<'a, T> { +impl<'a, T, Index> DerefMut for CompareSwap<'a, T, Index> { fn deref_mut(&mut self) -> &mut Self::Target { self.0 } diff --git a/nebari/src/tree/root.rs b/nebari/src/tree/root.rs index 14564bd..8c3188b 100644 --- a/nebari/src/tree/root.rs +++ b/nebari/src/tree/root.rs @@ -15,8 +15,8 @@ use crate::{ roots::AnyTransactionTree, transaction::{TransactionId, TransactionManager}, tree::{ - btree_entry::ScanArgs, state::AnyTreeState, Modification, PageHeader, PagedWriter, Reducer, - ScanEvaluation, State, TreeFile, + btree_entry::ScanArgs, state::AnyTreeState, Modification, ModificationResult, PageHeader, + PagedWriter, Reducer, ScanEvaluation, State, TreeFile, }, vault::AnyVault, AbortError, ArcBytes, ChunkCache, Context, TransactionTree, Vault, @@ -94,13 +94,14 @@ pub trait Root: Debug + Send + Sync + Clone + 'static { /// Returns the current transaction id. fn transaction_id(&self) -> TransactionId; - /// Modifies the tree. + /// Modifies the tree. Returns a list of modified keys and their updated + /// indexes, if the keys are still present. fn modify<'a, 'w>( &'a mut self, - modification: Modification<'_, ArcBytes<'static>>, + modification: Modification<'_, ArcBytes<'static>, Self::Index>, writer: &'a mut PagedWriter<'w>, max_order: Option, - ) -> Result<(), Error>; + ) -> Result>, Error>; /// Iterates over the tree looking for `keys`. `keys` must be sorted. /// `key_evaluator` is invoked for each key as it is found, allowing for @@ -117,8 +118,8 @@ pub trait Root: Debug + Send + Sync + Clone + 'static { cache: Option<&ChunkCache>, ) -> Result<(), Error> where - KeyEvaluator: FnMut(&ArcBytes<'static>) -> ScanEvaluation, - KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>) -> Result<(), Error>, + KeyEvaluator: FnMut(&ArcBytes<'static>, &Self::Index) -> ScanEvaluation, + KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Self::Index) -> Result<(), Error>, Keys: Iterator; /// Scans the tree over `range`. `args.key_evaluator` is invoked for each key as diff --git a/nebari/src/tree/unversioned.rs b/nebari/src/tree/unversioned.rs index 689cfb5..ba05442 100644 --- a/nebari/src/tree/unversioned.rs +++ b/nebari/src/tree/unversioned.rs @@ -26,7 +26,7 @@ use crate::{ by_id::ByIdIndexer, copy_chunk, dynamic_order, versioned::ChangeResult, - BTreeNode, PageHeader, Root, + BTreeNode, ModificationResult, PageHeader, Root, }, vault::AnyVault, ArcBytes, ChunkCache, ErrorKind, @@ -72,10 +72,10 @@ where { fn modify_id_root<'a, 'w>( &'a mut self, - mut modification: Modification<'_, ArcBytes<'static>>, + mut modification: Modification<'_, ArcBytes<'static>, UnversionedByIdIndex>, writer: &'a mut PagedWriter<'w>, max_order: Option, - ) -> Result<(), Error> { + ) -> Result>>, Error> { modification.reverse()?; let total_keys = @@ -87,10 +87,12 @@ where let reducer = self.reducer.clone(); + let mut results = Vec::with_capacity(modification.keys.len()); + while !modification.keys.is_empty() { match self.by_id_root.modify( &mut modification, - &ModificationContext { + &mut ModificationContext { current_order: by_id_order, minimum_children, indexer: |key: &ArcBytes<'_>, @@ -103,12 +105,21 @@ where // write_chunk errors if it can't fit within a u32 #[allow(clippy::cast_possible_truncation)] let value_length = value.len() as u32; - Ok(KeyOperation::Set(UnversionedByIdIndex { + let new_index = UnversionedByIdIndex { value_length, position, embedded: reducer.0.index(key, Some(value)), - })) + }; + results.push(ModificationResult { + key: key.to_owned(), + index: Some(new_index.clone()), + }); + Ok(KeyOperation::Set(new_index)) } else { + results.push(ModificationResult { + key: key.to_owned(), + index: None, + }); Ok(KeyOperation::Remove) } }, @@ -134,7 +145,7 @@ where } } - Ok(()) + Ok(results) } } @@ -225,20 +236,20 @@ where fn modify( &mut self, - modification: Modification<'_, ArcBytes<'static>>, + modification: Modification<'_, ArcBytes<'static>, Self::Index>, writer: &mut PagedWriter<'_>, max_order: Option, - ) -> Result<(), Error> { + ) -> Result>, Error> { let transaction_id = modification.persistence_mode.transaction_id(); - self.modify_id_root(modification, writer, max_order)?; + let results = self.modify_id_root(modification, writer, max_order)?; // Only update the transaction id if a new one was specified. if let Some(transaction_id) = transaction_id { self.transaction_id = Some(transaction_id); } - Ok(()) + Ok(results) } fn get_multiple<'keys, KeyEvaluator, KeyReader, Keys>( @@ -251,18 +262,18 @@ where cache: Option<&ChunkCache>, ) -> Result<(), Error> where - KeyEvaluator: FnMut(&ArcBytes<'static>) -> ScanEvaluation, - KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>) -> Result<(), Error>, + KeyEvaluator: FnMut(&ArcBytes<'static>, &Self::Index) -> ScanEvaluation, + KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Self::Index) -> Result<(), Error>, Keys: Iterator, { let mut positions_to_read = Vec::new(); self.by_id_root.get( &mut KeyRange::new(keys), - &mut |key, _index| key_evaluator(key), + &mut |key, index| key_evaluator(key, index), &mut |key, index| { // Deleted keys are stored with a 0 position. if index.position > 0 { - positions_to_read.push((key, index.position)); + positions_to_read.push((key, index.clone())); } Ok(()) }, @@ -272,16 +283,18 @@ where )?; // Sort by position on disk - positions_to_read.sort_by(|a, b| a.1.cmp(&b.1)); + positions_to_read.sort_by(|a, b| a.1.position.cmp(&b.1.position)); - for (key, position) in positions_to_read { - if position > 0 { - match read_chunk(position, false, file, vault, cache)? { + for (key, index) in positions_to_read { + if index.position > 0 { + match read_chunk(index.position, false, file, vault, cache)? { CacheEntry::ArcBytes(contents) => { - key_reader(key, contents)?; + key_reader(key, contents, index)?; } CacheEntry::Decoded(_) => unreachable!(), }; + } else { + key_reader(key, ArcBytes::default(), index)?; } } Ok(()) diff --git a/nebari/src/tree/versioned.rs b/nebari/src/tree/versioned.rs index 30d111f..eae9a9a 100644 --- a/nebari/src/tree/versioned.rs +++ b/nebari/src/tree/versioned.rs @@ -29,7 +29,7 @@ use crate::{ copy_chunk, dynamic_order, key_entry::KeyEntry, modify::Operation, - BTreeNode, Interior, PageHeader, PersistenceMode, Reducer, Root, + BTreeNode, Interior, ModificationResult, PageHeader, PersistenceMode, Reducer, Root, }, vault::AnyVault, ArcBytes, ChunkCache, ErrorKind, @@ -97,7 +97,7 @@ where { fn modify_sequence_root( &mut self, - mut modification: Modification<'_, BySequenceIndex>, + mut modification: Modification<'_, BySequenceIndex, BySequenceIndex>, writer: &mut PagedWriter<'_>, max_order: Option, ) -> Result<(), Error> { @@ -118,16 +118,17 @@ where while !modification.keys.is_empty() { match self.by_sequence_root.modify( &mut modification, - &ModificationContext { + &mut ModificationContext { current_order: by_sequence_order, minimum_children: by_sequence_minimum_children, - indexer: |_key: &ArcBytes<'_>, + indexer: + &mut |_key: &ArcBytes<'_>, value: Option<&BySequenceIndex>, _existing_index: Option<&BySequenceIndex>, _changes: &mut EntryChanges, _writer: &mut PagedWriter<'_>| { - Ok(KeyOperation::Set(value.unwrap().clone())) - }, + Ok(KeyOperation::Set(value.unwrap().clone())) + }, loader: |_index: &BySequenceIndex, _writer: &mut PagedWriter<'_>| Ok(None), reducer: BySequenceReducer, _phantom: PhantomData, @@ -150,11 +151,11 @@ where fn modify_id_root( &mut self, - mut modification: Modification<'_, ArcBytes<'static>>, + mut modification: Modification<'_, ArcBytes<'static>, VersionedByIdIndex>, changes: &mut EntryChanges, writer: &mut PagedWriter<'_>, max_order: Option, - ) -> Result<(), Error> { + ) -> Result>>, Error> { modification.reverse()?; let total_id_records = @@ -165,19 +166,20 @@ where let by_id_minimum_children = by_id_minimum_children.min(usize::try_from(total_id_records).unwrap_or(usize::MAX)); - let reducer = self.reducer.clone(); + let mut results = Vec::new(); while !modification.keys.is_empty() { + let reducer = self.reducer.clone(); match self.by_id_root.modify( &mut modification, - &ModificationContext { + &mut ModificationContext { current_order: by_id_order, minimum_children: by_id_minimum_children, - indexer: |key: &ArcBytes<'_>, - value: Option<&ArcBytes<'static>>, - existing_index: Option<&VersionedByIdIndex>, - changes: &mut EntryChanges, - writer: &mut PagedWriter<'_>| { + indexer: &mut |key: &ArcBytes<'_>, + value: Option<&ArcBytes<'static>>, + existing_index: Option<&VersionedByIdIndex>, + changes: &mut EntryChanges, + writer: &mut PagedWriter<'_>| { let (position, value_size) = if let Some(value) = value { let new_position = writer.write_chunk(value)?; // write_chunk errors if it can't fit within a u32 @@ -202,12 +204,17 @@ where value_position: position, value_size, }); - Ok(KeyOperation::Set(VersionedByIdIndex { + let new_index = VersionedByIdIndex { sequence_id: changes.current_sequence, position, value_length: value_size, embedded, - })) + }; + results.push(ModificationResult { + key, + index: Some(new_index.clone()), + }); + Ok(KeyOperation::Set(new_index)) }, loader: |index, writer| { if index.position > 0 { @@ -240,7 +247,7 @@ where self.sequence = changes.current_sequence; - Ok(()) + Ok(results) } } @@ -345,10 +352,10 @@ where fn modify( &mut self, - modification: Modification<'_, ArcBytes<'static>>, + modification: Modification<'_, ArcBytes<'static>, Self::Index>, writer: &mut PagedWriter<'_>, max_order: Option, - ) -> Result<(), Error> { + ) -> Result>, Error> { let persistence_mode = modification.persistence_mode; // Insert into both trees @@ -356,7 +363,7 @@ where current_sequence: self.sequence, changes: Vec::with_capacity(modification.keys.len()), }; - self.modify_id_root(modification, &mut changes, writer, max_order)?; + let results = self.modify_id_root(modification, &mut changes, writer, max_order)?; // Convert the changes into a modification request for the id root. let mut values = Vec::with_capacity(changes.changes.len()); @@ -386,7 +393,7 @@ where self.transaction_id = transaction_id; } - Ok(()) + Ok(results) } fn get_multiple<'keys, KeyEvaluator, KeyReader, Keys>( @@ -399,18 +406,18 @@ where cache: Option<&ChunkCache>, ) -> Result<(), Error> where - KeyEvaluator: FnMut(&ArcBytes<'static>) -> ScanEvaluation, - KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>) -> Result<(), Error>, + KeyEvaluator: FnMut(&ArcBytes<'static>, &Self::Index) -> ScanEvaluation, + KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Self::Index) -> Result<(), Error>, Keys: Iterator, { let mut positions_to_read = Vec::new(); self.by_id_root.get( &mut KeyRange::new(keys), - &mut |key, _index| key_evaluator(key), + &mut |key, index| key_evaluator(key, index), &mut |key, index| { // Deleted keys are stored with a 0 position. if index.position > 0 { - positions_to_read.push((key, index.position)); + positions_to_read.push((key, index.clone())); } Ok(()) }, @@ -420,16 +427,18 @@ where )?; // Sort by position on disk - positions_to_read.sort_by(|a, b| a.1.cmp(&b.1)); + positions_to_read.sort_by(|a, b| a.1.position.cmp(&b.1.position)); - for (key, position) in positions_to_read { - if position > 0 { - match read_chunk(position, false, file, vault, cache)? { + for (key, index) in positions_to_read { + if index.position > 0 { + match read_chunk(index.position, false, file, vault, cache)? { CacheEntry::ArcBytes(contents) => { - key_reader(key, contents)?; + key_reader(key, contents, index)?; } CacheEntry::Decoded(_) => unreachable!(), }; + } else { + key_reader(key, ArcBytes::default(), index)?; } } Ok(()) @@ -542,14 +551,14 @@ where // This modification copies the `sequence_indexes` into the sequence root. self.by_sequence_root.modify( &mut modification, - &ModificationContext { + &mut ModificationContext { current_order: by_sequence_order, minimum_children, - indexer: |_key: &ArcBytes<'_>, - value: Option<&BySequenceIndex>, - _existing_index: Option<&BySequenceIndex>, - _changes: &mut EntryChanges, - _writer: &mut PagedWriter<'_>| { + indexer: &mut |_key: &ArcBytes<'_>, + value: Option<&BySequenceIndex>, + _existing_index: Option<&BySequenceIndex>, + _changes: &mut EntryChanges, + _writer: &mut PagedWriter<'_>| { Ok(KeyOperation::Set(value.unwrap().clone())) }, loader: |_index: &BySequenceIndex, _writer: &mut PagedWriter<'_>| unreachable!(),