diff --git a/Cargo.toml b/Cargo.toml index b147ba90..94b30af6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ crossbeam-channel = "0.5" enum_dispatch = "0.3" farmhash = "1.1" getset = "0.1.2" +hashbrown = { version = "0.12", features = ["raw"] } lazy_static = "1.4.0" log = "0.4.17" memmap2 = "0.3" @@ -38,11 +39,7 @@ tempfile = "3" tikv-jemallocator = "0.4.0" [workspace] -members = [ - "agate_bench", - "proto", - "skiplist" -] +members = ["agate_bench", "proto", "skiplist"] [[bench]] name = "bench_common" diff --git a/src/cache/lru.rs b/src/cache/lru.rs new file mode 100644 index 00000000..bf472203 --- /dev/null +++ b/src/cache/lru.rs @@ -0,0 +1,654 @@ +use std::{ + collections::hash_map::RandomState, + hash::{BuildHasher, Hash}, + mem::{ManuallyDrop, MaybeUninit}, + ops::{Deref, DerefMut}, + ptr::NonNull, + sync::Mutex, +}; + +use hashbrown::raw::RawTable; + +use super::{utils::hash_key, Cache, CacheHandle, CacheShard, Options}; + +const FLAG_IN_CACHE: u8 = 1; +const FLAG_DUMMY: u8 = 1 << 1; + +struct LRUNode { + hash: u64, + charge: usize, + ref_count: u32, + flag: u8, + next: NodePtr, + prev: NodePtr, + key: ManuallyDrop, + value: ManuallyDrop, +} + +impl LRUNode { + fn dummy() -> Self { + unsafe { + let mut node = MaybeUninit::assume_init(MaybeUninit::>::uninit()); + node.flag = FLAG_DUMMY; + node + } + } + + fn new(key: K, value: V, charge: usize, hash: u64) -> Self { + Self { + hash, + charge, + ref_count: 1, + flag: 0, + next: NodePtr(NonNull::dangling()), + prev: NodePtr(NonNull::dangling()), + key: ManuallyDrop::new(key), + value: ManuallyDrop::new(value), + } + } + + fn in_cache(&self) -> bool { + self.flag & FLAG_IN_CACHE != 0 + } + + fn set_in_cache(&mut self, in_cache: bool) { + if in_cache { + self.flag |= FLAG_IN_CACHE; + } else { + self.flag &= !FLAG_IN_CACHE; + } + } + + fn incr_ref(&mut self) -> u32 { + self.ref_count += 1; + self.ref_count + } + + fn decr_ref(&mut self) -> u32 { + self.ref_count -= 1; + self.ref_count + } +} + +impl Drop for LRUNode { + fn drop(&mut self) { + if self.flag & FLAG_DUMMY == 0 { + unsafe { + ManuallyDrop::drop(&mut self.key); + ManuallyDrop::drop(&mut self.value); + } + } + } +} + +impl PartialEq for LRUNode +where + K: Eq, +{ + fn eq(&self, other: &Self) -> bool { + self.hash == other.hash && self.key.eq(&other.key) + } +} + +struct NodePtr(NonNull>); + +impl PartialEq for NodePtr { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 // cmp the address + } +} + +impl From<*mut LRUNode> for NodePtr { + fn from(n: *mut LRUNode) -> Self { + unsafe { Self(NonNull::new_unchecked(n)) } + } +} + +impl Clone for NodePtr { + fn clone(&self) -> Self { + Self(self.0) + } +} + +impl Deref for NodePtr { + type Target = LRUNode; + + fn deref(&self) -> &Self::Target { + unsafe { self.0.as_ref() } + } +} + +impl DerefMut for NodePtr { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { self.0.as_mut() } + } +} + +pub struct LRUHandle<'a, K, V, S: Send + Sync> { + node: NodePtr, + lru: &'a LRU, +} + +impl CacheHandle for LRUHandle<'_, K, V, S> +where + S: Send + Sync, +{ + type Value = V; + + fn value(&self) -> &Self::Value { + &self.node.value + } +} + +impl Drop for LRUHandle<'_, K, V, S> +where + S: Send + Sync, +{ + fn drop(&mut self) { + let mut lru = self.lru.inner.lock().unwrap(); + lru.unref_node(self.node.clone()); + } +} + +pub struct LRU { + inner: Mutex>, + hash_builder: S, +} + +struct LRUInner { + cap: usize, + usage: usize, + map: RawTable>, + dummy: NodePtr, + in_use_dummy: NodePtr, +} + +///! this might not be safe...... +unsafe impl Send for LRUInner {} + +impl LRUInner { + fn new(capacity: usize) -> Self { + let mut dummy = NodePtr::from(Box::into_raw(Box::new(LRUNode::dummy()))); + dummy.next = dummy.clone(); + dummy.prev = dummy.clone(); + let mut in_use_dummy = NodePtr::from(Box::into_raw(Box::new(LRUNode::dummy()))); + in_use_dummy.next = in_use_dummy.clone(); + in_use_dummy.prev = in_use_dummy.clone(); + Self { + cap: capacity, + usage: 0, + map: RawTable::new(), + dummy, + in_use_dummy, + } + } + + fn list_remove_node(mut node: NodePtr) { + node.next.prev = node.prev.clone(); + node.prev.next = node.next.clone(); + } + + fn append(list: NodePtr, mut node: NodePtr) { + node.next = list.clone(); + node.prev = list.prev.clone(); + node.prev.next = node.clone(); + node.next.prev = node.clone(); + } + + fn ref_node(&mut self, mut node: NodePtr) { + if node.ref_count == 1 && node.in_cache() { + LRUInner::list_remove_node(node.clone()); + LRUInner::append(self.in_use_dummy.clone(), node.clone()); + } + node.incr_ref(); + } + + fn unref_node(&mut self, mut node: NodePtr) { + node.decr_ref(); + if node.ref_count == 0 { + debug_assert!(!node.in_cache()); + unsafe { Box::from_raw(node.0.as_ptr()) }; + } else if node.in_cache() && node.ref_count == 1 { + LRUInner::list_remove_node(node.clone()); + LRUInner::append(self.dummy.clone(), node.clone()); + } + } + + fn finish_remove_entry(&mut self, mut node: NodePtr) { + Self::list_remove_node(node.clone()); + node.set_in_cache(false); + self.usage -= node.charge; + self.unref_node(node); + } +} + +impl Drop for LRUInner { + fn drop(&mut self) { + debug_assert!(self.in_use_dummy == self.in_use_dummy.next); + let mut node = self.dummy.next.clone(); + let mut next; + while node != self.dummy { + next = node.next.clone(); + debug_assert!(node.in_cache()); + node.set_in_cache(false); + debug_assert!(node.ref_count == 1); + self.unref_node(node); + node = next; + } + unsafe { + Box::from_raw(self.dummy.0.as_ptr()); + Box::from_raw(self.in_use_dummy.0.as_ptr()); + } + } +} + +impl LRU { + pub fn new(capacity: usize) -> Self { + Self::with_hasher(capacity, RandomState::default()) + } +} + +impl LRU { + pub fn with_hasher(capacity: usize, hash_builder: S) -> Self { + Self { + inner: Mutex::new(LRUInner::new(capacity)), + hash_builder, + } + } +} + +impl CacheShard for LRU +where + S: Send + Sync, +{ + type Key = K; + type Value = V; + type HashBuilder = S; + + fn shard_with_options(opts: &Options) -> Self + where + Self::HashBuilder: Clone, + { + Self::with_hasher(opts.capacity, opts.hash_builder.clone()) + } + + fn shard_insert( + &self, + key: Self::Key, + value: Self::Value, + hash: u64, + charge: usize, + ) -> Box + '_> + where + Self::Key: Eq, + { + let mut node = unsafe { + NodePtr(NonNull::new_unchecked(Box::into_raw(Box::new( + LRUNode::new(key, value, charge, hash), + )))) + }; + + let mut lru = self.inner.lock().unwrap(); + + if lru.cap > 0 { + node.incr_ref(); // lru has 1, returned handle has 1 + node.set_in_cache(true); + LRUInner::append(lru.in_use_dummy.clone(), node.clone()); + lru.usage += charge; + // remove if node with same key already in lru + if let Some(rn) = lru + .map + .remove_entry(hash, |e| e.hash == hash && e.key.deref().eq(&node.key)) + { + lru.finish_remove_entry(rn); + } + lru.map.insert(hash, node.clone(), |e| e.hash); + } + + while lru.usage > lru.cap && lru.dummy != lru.dummy.next { + let old = lru.dummy.next.clone(); + debug_assert_eq!(old.ref_count, 1); + if let Some(rn) = lru.map.remove_entry(old.hash, |e| { + e.hash == old.hash && e.key.deref().eq(&old.key) + }) { + lru.finish_remove_entry(rn); + } + } + + Box::new(LRUHandle { node, lru: self }) + } + + fn shard_lookup( + &self, + key: &Self::Key, + hash: u64, + ) -> Option + '_>> + where + Self::Key: Eq, + { + let mut lru = self.inner.lock().unwrap(); + let node = lru + .map + .get(hash, |e| e.hash == hash && e.key.deref().eq(key))? + .clone(); + + lru.ref_node(node.clone()); + Some(Box::new(LRUHandle { node, lru: self })) + } + + fn shard_erase(&self, key: &Self::Key, hash: u64) + where + Self::Key: Eq, + { + let mut lru = self.inner.lock().unwrap(); + if let Some(node) = lru + .map + .remove_entry(hash, |e| e.hash == hash && e.key.deref().eq(key)) + { + lru.finish_remove_entry(node); + } + } + + fn shard_prune(&self) + where + Self::Key: Eq, + { + let mut lru = self.inner.lock().unwrap(); + while lru.dummy != lru.dummy.next { + let node_to_remove = lru.dummy.next.clone(); + debug_assert_eq!(node_to_remove.ref_count, 1); + let rn = lru.map.remove_entry(node_to_remove.hash, |e| { + e.hash == node_to_remove.hash && e.key.deref().eq(&node_to_remove.key) + }); + debug_assert!(rn.is_some()); + if let Some(node) = rn { + lru.finish_remove_entry(node); + } + } + } +} + +impl Cache for LRU +where + S: Send + Sync, +{ + type Key = K; + type Value = V; + type HashBuilder = S; + + fn with_options(opts: &Options) -> Self + where + Self::HashBuilder: Clone, + { + Self::with_hasher(opts.capacity, opts.hash_builder.clone()) + } + + fn insert( + &self, + key: Self::Key, + value: Self::Value, + charge: usize, + ) -> Box + '_> + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher, + { + let hash = hash_key(&self.hash_builder, &key); + self.shard_insert(key, value, hash, charge) + } + + fn lookup(&self, key: &K) -> Option + '_>> + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher, + { + let hash = hash_key(&self.hash_builder, key); + self.shard_lookup(key, hash) + } + + fn erase(&self, key: &K) + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher, + { + let hash = hash_key(&self.hash_builder, key); + self.shard_erase(key, hash) + } + + fn prune(&self) + where + Self::Key: Eq, + { + self.shard_prune() + } +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + + use super::{Cache, LRU}; + + // protect DROPPED_KEY and DROPPED_VALUE. + // cargo test will run in parallel mode by default + static THREAD_LOCK: Mutex<()> = Mutex::new(()); + static mut DROPPED_KEY: Vec = vec![]; + static mut DROPPED_VALUE: Vec = vec![]; + + #[derive(Debug, Hash, PartialEq, Eq)] + struct TestKey(i32); + #[derive(Debug, PartialEq, Eq)] + struct TestValue(i32); + + impl TestKey { + fn set(&mut self, v: i32) -> &Self { + self.0 = v; + self + } + } + + impl Drop for TestKey { + fn drop(&mut self) { + unsafe { DROPPED_KEY.push(self.0) } + } + } + + impl Drop for TestValue { + fn drop(&mut self) { + unsafe { DROPPED_VALUE.push(self.0) } + } + } + + fn lookup(lru: &LRU, key: i32) -> Option { + let key = TestKey(key); + let r = lru.lookup(&key).map(|h| h.value().0); + std::mem::forget(key); + r + } + + fn insert(lru: &LRU, key: i32, value: i32, charge: usize) { + lru.insert(TestKey(key), TestValue(value), charge); + } + + fn erase(lru: &LRU, key: i32) { + let key = TestKey(key); + lru.erase(&key); + std::mem::forget(key); + } + + fn clear() { + unsafe { + DROPPED_KEY.clear(); + DROPPED_VALUE.clear(); + } + } + + #[test] + fn simple() { + let _g = THREAD_LOCK.lock().unwrap(); + clear(); + let lru = LRU::::new(100); + assert_eq!(lookup(&lru, 100), None); + + insert(&lru, 100, 100, 1); + assert_eq!(lookup(&lru, 100), Some(100)); + assert_eq!(lookup(&lru, 200), None); + assert_eq!(lookup(&lru, 300), None); + + insert(&lru, 200, 200, 1); + assert_eq!(lookup(&lru, 100), Some(100)); + assert_eq!(lookup(&lru, 200), Some(200)); + assert_eq!(lookup(&lru, 300), None); + + insert(&lru, 100, 101, 1); + assert_eq!(lookup(&lru, 100), Some(101)); + assert_eq!(lookup(&lru, 200), Some(200)); + assert_eq!(lookup(&lru, 300), None); + + assert_eq!(unsafe { DROPPED_KEY.len() }, 1); + assert_eq!(unsafe { DROPPED_KEY[0] }, 100); + assert_eq!(unsafe { DROPPED_VALUE[0] }, 100); + + drop(lru); + assert_eq!(unsafe { DROPPED_KEY.len() }, 3); + } + + #[test] + fn remove() { + let _g = THREAD_LOCK.lock().unwrap(); + clear(); + let lru = LRU::::new(100); + erase(&lru, 200); + assert_eq!(unsafe { DROPPED_KEY.len() }, 0); + insert(&lru, 100, 100, 1); + insert(&lru, 200, 200, 1); + erase(&lru, 100); + assert_eq!(lookup(&lru, 100), None); + assert_eq!(lookup(&lru, 200), Some(200)); + assert_eq!(unsafe { DROPPED_KEY.len() }, 1); + assert_eq!(unsafe { DROPPED_KEY[0] }, 100); + assert_eq!(unsafe { DROPPED_VALUE[0] }, 100); + erase(&lru, 100); + assert_eq!(lookup(&lru, 100), None); + assert_eq!(lookup(&lru, 200), Some(200)); + assert_eq!(unsafe { DROPPED_KEY.len() }, 1); + + drop(lru); + assert_eq!(unsafe { DROPPED_KEY.len() }, 2); + } + + #[test] + fn pinned() { + let _g = THREAD_LOCK.lock().unwrap(); + clear(); + + let mut lookup_key = TestKey(0); + + let lru = LRU::::new(100); + insert(&lru, 100, 100, 1); + let h1 = lru.lookup(lookup_key.set(100)); + assert_eq!(Some(100), h1.as_ref().map(|h| h.value().0)); + + insert(&lru, 100, 101, 1); + let h2 = lru.lookup(lookup_key.set(100)); + assert_eq!(Some(101), h2.as_ref().map(|h| h.value().0)); + assert_eq!(unsafe { DROPPED_KEY.len() }, 0); + + drop(h1); + assert_eq!(unsafe { DROPPED_KEY.len() }, 1); + assert_eq!(unsafe { DROPPED_KEY[0] }, 100); + assert_eq!(unsafe { DROPPED_VALUE[0] }, 100); + + erase(&lru, 100); + assert_eq!(lookup(&lru, 100), None); + assert_eq!(unsafe { DROPPED_KEY.len() }, 1); + + drop(h2); + assert_eq!(unsafe { DROPPED_KEY.len() }, 2); + assert_eq!(unsafe { DROPPED_KEY[1] }, 100); + assert_eq!(unsafe { DROPPED_VALUE[1] }, 101); + std::mem::forget(lookup_key); + } + + #[test] + fn evict() { + let _g = THREAD_LOCK.lock().unwrap(); + clear(); + + let mut lookup_key = TestKey(0); + + let lru = LRU::::new(100); + insert(&lru, 100, 100, 1); + insert(&lru, 200, 200, 1); + insert(&lru, 300, 300, 1); + let h = lru.lookup(lookup_key.set(300)); + assert!(h.is_some()); + + for i in 0..200 { + insert(&lru, 1000 + i, 1000 + i, 1); + assert_eq!(lookup(&lru, 1000 + i), Some(1000 + i)); + assert_eq!(lookup(&lru, 100), Some(100)); + } + + assert_eq!(lookup(&lru, 100), Some(100)); + assert_eq!(lookup(&lru, 300), Some(300)); + assert_eq!(lookup(&lru, 200), None); + drop(h); + std::mem::forget(lookup_key); + } + + #[test] + fn hang() { + let _g = THREAD_LOCK.lock().unwrap(); + clear(); + let lru = LRU::::new(100); + let mut handles = vec![]; + for i in 0..200 { + handles.push(lru.insert(TestKey(i), TestValue(i), 1)); + } + for i in 0..200 { + assert_eq!(lookup(&lru, i), Some(i)); + } + drop(handles); + } + + #[test] + fn diff_charge() { + let _g = THREAD_LOCK.lock().unwrap(); + clear(); + let lru = LRU::::new(100); + for i in 0..200 { + insert(&lru, i, i, if i & 1 != 0 { 1 } else { 2 }); + } + let mut usage = 0; + for i in 0..200 { + if let Some(v) = lookup(&lru, i) { + assert_eq!(v, i); + usage += if i & 1 != 0 { 1 } else { 2 }; + } + } + assert!(usage <= 100); + } + + #[test] + fn multi_thread() { + let _g = THREAD_LOCK.lock().unwrap(); + clear(); + let lru = Arc::new(LRU::::new(10000)); + let mut thread_handles = vec![]; + for i in 0..10 { + let l = lru.clone(); + thread_handles.push(std::thread::spawn(move || { + for j in 0..1000 { + insert(&l, i * 1000 + j, j, 1); + } + })); + } + thread_handles.into_iter().for_each(|h| { + h.join().unwrap(); + }); + for i in 0..10000 { + assert_eq!(lookup(&lru, i), Some(i % 1000)); + } + } +} diff --git a/src/cache/mod.rs b/src/cache/mod.rs new file mode 100644 index 00000000..5d618736 --- /dev/null +++ b/src/cache/mod.rs @@ -0,0 +1,111 @@ +mod lru; +mod shard; +mod utils; +use std::{ + collections::hash_map::RandomState, + fmt::Debug, + hash::{BuildHasher, Hash}, +}; + +pub use lru::*; +pub use shard::*; + +#[derive(Clone)] +pub struct Options { + pub capacity: usize, + pub shard_bits: usize, + pub hash_builder: S, +} + +pub trait CacheHandle { + type Value; + fn value(&self) -> &Self::Value; +} + +pub trait Cache: Send + Sync { + // We can use GAT feature for the returned handle, + // but it will cause this trait not object save. + // User may want use this trait like Box> + + type Key; + type Value; + type HashBuilder; + + fn with_options(opts: &Options) -> Self + where + Self::HashBuilder: Clone, + Self: Sized; + + fn insert( + &self, + key: Self::Key, + value: Self::Value, + charge: usize, + ) -> Box + '_> + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher; + + fn lookup(&self, key: &Self::Key) -> Option + '_>> + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher; + + fn erase(&self, key: &Self::Key) + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher; + + fn prune(&self) + where + Self::Key: Eq; +} + +pub trait CacheShard: Send + Sync { + type Key; + type Value; + type HashBuilder; + + fn shard_with_options(opts: &Options) -> Self + where + Self::HashBuilder: Clone, + Self: Sized; + + fn shard_insert( + &self, + key: Self::Key, + value: Self::Value, + hash: u64, + charge: usize, + ) -> Box + '_> + where + Self::Key: Eq; + + fn shard_lookup( + &self, + key: &Self::Key, + hash: u64, + ) -> Option + '_>> + where + Self::Key: Eq; + + fn shard_erase(&self, key: &Self::Key, hash: u64) + where + Self::Key: Eq; + + fn shard_prune(&self) + where + Self::Key: Eq; +} + +// TODO: Many struct derive Debug but Cache may not have debug info. +// TODO: This is just for avoiding build error. +// TODO: May be somewhat like `trait Cache: Debug + Send + Sync`? +impl Debug for dyn Cache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "block_cache") + } +} + +#[cfg(test)] +mod tests {} diff --git a/src/cache/shard.rs b/src/cache/shard.rs new file mode 100644 index 00000000..bc1e9103 --- /dev/null +++ b/src/cache/shard.rs @@ -0,0 +1,125 @@ +use std::{ + collections::hash_map::RandomState, + hash::{BuildHasher, Hash}, +}; + +use super::{utils::hash_key, Cache, CacheHandle, CacheShard, Options}; + +pub struct ShardCache { + shards: Vec, + hash_builder: S, +} + +impl ShardCache +where + T: CacheShard, +{ + pub fn new(capacity: usize, shard_bits: usize) -> Self { + Self::with_hasher(capacity, shard_bits, RandomState::default()) + } +} + +impl ShardCache +where + T: CacheShard, + S: Clone, +{ + pub fn with_hasher(capacity: usize, shard_bits: usize, hash_builder: S) -> Self { + let num_shards = 1 << shard_bits; + let per_shard = (capacity + (num_shards - 1)) / num_shards; + Self { + shards: (0..num_shards) + .map(|_| { + T::shard_with_options(&Options { + capacity: per_shard, + shard_bits: 0, + hash_builder: hash_builder.clone(), + }) + }) + .collect(), + hash_builder, + } + } +} + +impl Cache for ShardCache +where + T: CacheShard, + S: Send + Sync, +{ + type Key = T::Key; + type Value = T::Value; + type HashBuilder = S; + + fn with_options(opts: &Options) -> Self + where + Self::HashBuilder: Clone, + Self: Sized, + { + Self::with_hasher(opts.capacity, opts.shard_bits, opts.hash_builder.clone()) + } + + fn insert( + &self, + key: Self::Key, + value: Self::Value, + charge: usize, + ) -> Box + '_> + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher, + { + let hash = hash_key(&self.hash_builder, &key); + self.shards[hash as usize & (self.shards.len() - 1)].shard_insert(key, value, hash, charge) + } + + fn lookup(&self, key: &Self::Key) -> Option + '_>> + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher, + { + let hash = hash_key(&self.hash_builder, &key); + self.shards[hash as usize & (self.shards.len() - 1)].shard_lookup(key, hash) + } + + fn erase(&self, key: &Self::Key) + where + Self::Key: Hash + Eq, + Self::HashBuilder: BuildHasher, + { + let hash = hash_key(&self.hash_builder, &key); + self.shards[hash as usize & (self.shards.len() - 1)].shard_erase(key, hash) + } + + fn prune(&self) + where + Self::Key: Eq, + { + self.shards.iter().for_each(|s| s.shard_prune()) + } +} + +#[cfg(test)] +mod tests { + use crate::cache::{Cache, ShardCache, LRU}; + + #[test] + fn simple() { + let cache = ShardCache::>::new(4096, 4); + { + cache.insert(10, 10, 1); + } + { + let h1 = cache.lookup(&10); + assert!(h1.is_some()); + assert_eq!(h1.as_ref().unwrap().value(), &10); + { + cache.insert(10, 11, 1); + } + assert_eq!(h1.unwrap().value(), &10); + let h2 = cache.lookup(&10); + assert!(h2.is_some()); + assert_eq!(h2.unwrap().value(), &11); + } + } +} diff --git a/src/cache/utils.rs b/src/cache/utils.rs new file mode 100644 index 00000000..ccecc372 --- /dev/null +++ b/src/cache/utils.rs @@ -0,0 +1,11 @@ +use std::hash::{BuildHasher, Hash, Hasher}; + +pub(crate) fn hash_key(s: &S, k: &Q) -> u64 +where + S: BuildHasher, + Q: Hash + ?Sized, +{ + let mut hasher = s.build_hasher(); + k.hash(&mut hasher); + hasher.finish() +} diff --git a/src/db/opt.rs b/src/db/opt.rs index 2b5025dc..3a433eab 100644 --- a/src/db/opt.rs +++ b/src/db/opt.rs @@ -1,10 +1,15 @@ -use std::cmp; +use std::{cmp, collections::hash_map::RandomState}; use getset::Setters; use skiplist::MAX_NODE_SIZE; use super::*; -use crate::{entry::Entry, opt}; +use crate::{ + cache::Cache, + entry::Entry, + opt, + table::{Block, BlockCacheKey}, +}; #[derive(Clone, Setters)] pub struct AgateOptions { @@ -144,6 +149,13 @@ pub struct AgateOptions { /// /// The default value of `max_batch_size` is (15 * `mem_table_size`) / 100. pub max_batch_size: u64, + + /*Dynamic trait options*/ + /// Block Cache for SST. + /// + /// The default value is `None` which means without block cache + pub block_cache: + Option, HashBuilder = RandomState>>>, } impl Default for AgateOptions { @@ -186,6 +198,8 @@ impl Default for AgateOptions { max_batch_count: 0, max_batch_size: 0, + + block_cache: None, } // TODO: add other options } diff --git a/src/lib.rs b/src/lib.rs index 17bb41ec..349c9e5a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ mod batch; mod bloom; +mod cache; mod checksum; mod closer; mod db; diff --git a/src/opt.rs b/src/opt.rs index cc436a33..d488c2f9 100644 --- a/src/opt.rs +++ b/src/opt.rs @@ -1,4 +1,10 @@ +use std::{collections::hash_map::RandomState, sync::Arc}; + use super::TableOptions; +use crate::{ + cache::Cache, + table::{Block, BlockCacheKey}, +}; #[derive(Debug, Clone, Default)] pub struct Options { @@ -12,6 +18,10 @@ pub struct Options { pub bloom_false_positive: f64, /// checksum mode pub checksum_mode: ChecksumVerificationMode, + /// db block cache + /// ? why not just use db options for table + pub block_cache: + Option, HashBuilder = RandomState>>>, } #[derive(Debug, Clone, Copy)] @@ -39,5 +49,6 @@ pub fn build_table_options(opt: &crate::AgateOptions) -> TableOptions { bloom_false_positive: opt.bloom_false_positive, checksum_mode: opt.checksum_mode, table_capacity: (opt.base_level_size as f64 * 0.95) as u64, + block_cache: opt.block_cache.clone(), } } diff --git a/src/table.rs b/src/table.rs index 67be25f9..e8b0c675 100644 --- a/src/table.rs +++ b/src/table.rs @@ -67,6 +67,12 @@ impl MmapFile { } } +#[derive(PartialEq, Eq, Hash)] +pub struct BlockCacheKey { + pub file_id: u64, + pub block_offset: u32, // u32 is defined in proto +} + /// TableInner stores data of an SST. /// It is immutable once created and initialized. pub struct TableInner { @@ -278,7 +284,7 @@ impl TableInner { self.fetch_index().offsets.get(idx) } - fn block(&self, idx: usize, _use_cache: bool) -> Result> { + fn block(&self, idx: usize, use_cache: bool) -> Result> { use ChecksumVerificationMode::*; // TODO: support cache @@ -289,6 +295,23 @@ impl TableInner { .offsets(idx) .ok_or_else(|| Error::TableRead(format!("failed to get offset block {}", idx)))?; + if use_cache { + let blk = self + .opts + .block_cache + .as_ref() + .and_then(|bc| { + bc.lookup(&BlockCacheKey { + file_id: self.id(), + block_offset: block_offset.offset, + }) + }) + .map(|handle| handle.value().clone()); + if let Some(blk) = blk { + return Ok(blk); + } + } + let offset = block_offset.offset as usize; let data = self.read(offset, block_offset.len as usize)?; @@ -333,6 +356,19 @@ impl TableInner { blk.verify_checksum()?; } + if use_cache { + if let Some(block_cache) = self.opts.block_cache.as_ref() { + block_cache.insert( + BlockCacheKey { + file_id: self.id(), + block_offset: blk.offset as u32, + }, + blk.clone(), + blk.size() as usize, + ); + } + } + Ok(blk) } diff --git a/src/table/builder.rs b/src/table/builder.rs index 5c4dbec2..6d6fb0b4 100644 --- a/src/table/builder.rs +++ b/src/table/builder.rs @@ -224,6 +224,7 @@ mod tests { bloom_false_positive: 0.01, table_size: 30 << 20, checksum_mode: crate::opt::ChecksumVerificationMode::OnTableAndBlockRead, + block_cache: None, }; let mut builder = Builder::new(opts.clone()); @@ -267,6 +268,7 @@ mod tests { table_size: 0, table_capacity: 0, checksum_mode: crate::opt::ChecksumVerificationMode::OnTableAndBlockRead, + block_cache: None, }; let table = build_test_table(key_prefix, key_count, opts); @@ -299,6 +301,7 @@ mod tests { table_size: 0, table_capacity: 0, checksum_mode: crate::opt::ChecksumVerificationMode::NoVerification, + block_cache: None, }; let b = Builder::new(opt); diff --git a/src/table/tests.rs b/src/table/tests.rs index ea86b7f5..78e47e6c 100644 --- a/src/table/tests.rs +++ b/src/table/tests.rs @@ -31,6 +31,7 @@ pub(crate) fn get_test_table_options() -> Options { bloom_false_positive: 0.01, table_capacity: 0, checksum_mode: ChecksumVerificationMode::OnTableRead, + block_cache: None, } } @@ -362,6 +363,7 @@ fn test_table_big_values() { table_size: (n as u64) * (1 << 20), table_capacity: 0, checksum_mode: ChecksumVerificationMode::OnTableRead, + block_cache: None, }; let mut builder = Builder::new(opts.clone());