From 028506e91b723d4b6474b82a53dbace96105633b Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Tue, 12 Nov 2024 19:24:13 +0100 Subject: [PATCH] Fix memory leak due to Options containing block cache Fixes #51 --- examples/kvserver/src/main.rs | 1 - src/block.rs | 35 +++--------- src/db_impl.rs | 13 ++++- src/filter_block.rs | 11 +--- src/options.rs | 7 +-- src/skipmap.rs | 2 + src/table_cache.rs | 18 ++++-- src/table_reader.rs | 103 ++++++++++++++++++++++------------ src/version.rs | 3 +- src/version_set.rs | 33 +++++++++-- 10 files changed, 134 insertions(+), 92 deletions(-) diff --git a/examples/kvserver/src/main.rs b/examples/kvserver/src/main.rs index 1df686c..c691398 100644 --- a/examples/kvserver/src/main.rs +++ b/examples/kvserver/src/main.rs @@ -1,4 +1,3 @@ - struct KVService { db: rusty_leveldb::DB, } diff --git a/src/block.rs b/src/block.rs index 9f50fab..1f8d027 100644 --- a/src/block.rs +++ b/src/block.rs @@ -320,10 +320,7 @@ mod tests { fn get_data() -> Vec<(&'static [u8], &'static [u8])> { vec![ (b"key1", b"value1"), - ( - b"loooooooooooooooooooooooooooooooooongerkey1", - b"shrtvl1", - ), + (b"loooooooooooooooooooooooooooooooooongerkey1", b"shrtvl1"), ("medium length key 1".as_bytes(), "some value 2".as_bytes()), (b"prefix_key1", b"value"), (b"prefix_key2", b"value"), @@ -401,10 +398,7 @@ mod tests { let mut block = Block::new(o.clone(), block_contents).iter(); assert!(!block.valid()); - assert_eq!( - block.next(), - Some((b"key1".to_vec(), b"value1".to_vec())) - ); + assert_eq!(block.next(), Some((b"key1".to_vec(), b"value1".to_vec()))); assert!(block.valid()); block.next(); assert!(block.valid()); @@ -425,10 +419,7 @@ mod tests { assert!(block.valid()); assert_eq!( current_key_val(&block), - Some(( - b"prefix_key2".to_vec(), - b"value".to_vec() - )) + Some((b"prefix_key2".to_vec(), b"value".to_vec())) ); } @@ -452,20 +443,14 @@ mod tests { assert!(block.valid()); assert_eq!( current_key_val(&block), - Some(( - b"prefix_key2".to_vec(), - b"value".to_vec() - )) + Some((b"prefix_key2".to_vec(), b"value".to_vec())) ); block.seek(b"prefix_key0"); assert!(block.valid()); assert_eq!( current_key_val(&block), - Some(( - b"prefix_key1".to_vec(), - b"value".to_vec() - )) + Some((b"prefix_key1".to_vec(), b"value".to_vec())) ); block.seek(b"key1"); @@ -479,10 +464,7 @@ mod tests { assert!(block.valid()); assert_eq!( current_key_val(&block), - Some(( - b"prefix_key3".to_vec(), - b"value".to_vec() - )) + Some((b"prefix_key3".to_vec(), b"value".to_vec())) ); block.seek(b"prefix_key8"); @@ -513,10 +495,7 @@ mod tests { assert!(block.valid()); assert_eq!( current_key_val(&block), - Some(( - b"prefix_key3".to_vec(), - b"value".to_vec() - )) + Some((b"prefix_key3".to_vec(), b"value".to_vec())) ); } } diff --git a/src/db_impl.rs b/src/db_impl.rs index 4bf9809..46bdcf9 100644 --- a/src/db_impl.rs +++ b/src/db_impl.rs @@ -3,6 +3,8 @@ #![allow(unused_attributes)] +use crate::block::Block; +use crate::cache::Cache; use crate::db_iter::DBIterator; use crate::cmp::{Cmp, InternalKeyCmp}; @@ -74,7 +76,12 @@ impl DB { } let path = name.canonicalize().unwrap_or(name.to_owned()); - let cache = share(TableCache::new(name, opt.clone(), opt.max_open_files - 10)); + let cache = share(TableCache::new( + name, + opt.clone(), + share(Cache::new(opt.block_cache_capacity_bytes / opt.block_size)), + opt.max_open_files - 10, + )); let vset = VersionSet::new(name, opt.clone(), cache.clone()); DB { @@ -1478,7 +1485,7 @@ mod tests { { // Read table back in. - let mut tc = TableCache::new("db", opt.clone(), 100); + let mut tc = TableCache::new("db", opt.clone(), share(Cache::new(128)), 100); let tbl = tc.get_table(123).unwrap(); assert_eq!(mt.len(), LdbIteratorIter::wrap(&mut tbl.iter()).count()); } @@ -1498,7 +1505,7 @@ mod tests { .write_all(&buf) .unwrap(); - let mut tc = TableCache::new("db", opt.clone(), 100); + let mut tc = TableCache::new("db", opt.clone(), share(Cache::new(128)), 100); let tbl = tc.get_table(123).unwrap(); // The last two entries are skipped due to the corruption above. assert_eq!( diff --git a/src/filter_block.rs b/src/filter_block.rs index 6f7017a..2bcc2ff 100644 --- a/src/filter_block.rs +++ b/src/filter_block.rs @@ -175,12 +175,7 @@ mod tests { } fn get_keys() -> Vec<&'static [u8]> { - vec![ - b"abcd", - b"efgh", - b"ijkl", - b"mnopqrstuvwxyz", - ] + vec![b"abcd", b"efgh", b"ijkl", b"mnopqrstuvwxyz"] } fn produce_filter_block() -> Vec { @@ -229,9 +224,7 @@ mod tests { 17 ); // third block in third filter - let unknown_keys: [&[u8]; 3] = [b"xsb", - b"9sad", - b"assssaaaass"]; + let unknown_keys: [&[u8]; 3] = [b"xsb", b"9sad", b"assssaaaass"]; for block_offset in vec![0, 1024, 5000, 6025].into_iter() { for key in get_keys().iter() { diff --git a/src/options.rs b/src/options.rs index 7af2371..82442a5 100644 --- a/src/options.rs +++ b/src/options.rs @@ -1,5 +1,3 @@ -use crate::block::Block; -use crate::cache::Cache; use crate::cmp::{Cmp, DefaultCmp}; use crate::compressor::{self, Compressor, CompressorId}; use crate::env::Env; @@ -33,7 +31,7 @@ pub struct Options { pub write_buffer_size: usize, pub max_open_files: usize, pub max_file_size: usize, - pub block_cache: Shared>, + pub block_cache_capacity_bytes: usize, pub block_size: usize, pub block_restart_interval: usize, /// Compressor id in compressor list @@ -66,8 +64,7 @@ impl Default for Options { write_buffer_size: WRITE_BUFFER_SIZE, max_open_files: 1 << 10, max_file_size: 2 << 20, - // 2000 elements by default - block_cache: share(Cache::new(BLOCK_CACHE_CAPACITY / BLOCK_MAX_SIZE)), + block_cache_capacity_bytes: BLOCK_MAX_SIZE * 1024, block_size: BLOCK_MAX_SIZE, block_restart_interval: 16, reuse_logs: true, diff --git a/src/skipmap.rs b/src/skipmap.rs index 53bde43..6d458d9 100644 --- a/src/skipmap.rs +++ b/src/skipmap.rs @@ -82,9 +82,11 @@ impl SkipMap { pub fn is_empty(&self) -> bool { self.len() == 0 } + pub fn approx_memory(&self) -> usize { self.map.borrow().approx_mem } + pub fn contains(&self, key: &[u8]) -> bool { self.map.borrow().contains(key) } diff --git a/src/table_cache.rs b/src/table_cache.rs index 1ab1389..7c388a8 100644 --- a/src/table_cache.rs +++ b/src/table_cache.rs @@ -2,12 +2,13 @@ //! read-through cache, meaning that non-present tables are read from disk and cached before being //! returned. +use crate::block::Block; use crate::cache::{self, Cache}; use crate::error::{err, Result, StatusCode}; use crate::key_types::InternalKey; use crate::options::Options; use crate::table_reader::Table; -use crate::types::FileNum; +use crate::types::{FileNum, Shared}; use integer_encoding::FixedIntWriter; @@ -29,6 +30,7 @@ fn filenum_to_key(num: FileNum) -> cache::CacheKey { pub struct TableCache { dbname: PathBuf, cache: Cache, + block_cache: Shared>, opts: Options, } @@ -36,10 +38,16 @@ impl TableCache { /// Create a new TableCache for the database named `db`, caching up to `entries` tables. /// /// opt.cmp should be the user-supplied comparator. - pub fn new>(db: P, opt: Options, entries: usize) -> TableCache { + pub fn new>( + db: P, + opt: Options, + block_cache: Shared>, + entries: usize, + ) -> TableCache { TableCache { dbname: db.as_ref().to_owned(), cache: Cache::new(entries), + block_cache, opts: opt, } } @@ -72,7 +80,7 @@ impl TableCache { } let file = Rc::new(self.opts.env.open_random_access_file(path)?); // No SSTable file name compatibility. - let table = Table::new(self.opts.clone(), file, file_size)?; + let table = Table::new(self.opts.clone(), self.block_cache.clone(), file, file_size)?; self.cache.insert(&filenum_to_key(file_num), table.clone()); Ok(table) } @@ -94,6 +102,7 @@ mod tests { use crate::options; use crate::table_builder::TableBuilder; use crate::test_util::LdbIteratorIter; + use crate::types::share; #[test] fn test_table_file_name() { @@ -138,6 +147,7 @@ mod tests { // parsed/iterated by the table reader. let mut opt = options::for_test(); opt.env = Rc::new(Box::new(MemEnv::new())); + let bc = share(Cache::new(128)); let dbname = Path::new("testdb1"); let tablename = table_file_name(dbname, 123); let tblpath = Path::new(&tablename); @@ -146,7 +156,7 @@ mod tests { assert!(opt.env.exists(tblpath).unwrap()); assert!(opt.env.size_of(tblpath).unwrap() > 20); - let mut cache = TableCache::new(dbname, opt.clone(), 10); + let mut cache = TableCache::new(dbname, opt.clone(), bc, 10); assert!(cache.cache.get(&filenum_to_key(123)).is_none()); assert_eq!( LdbIteratorIter::wrap(&mut cache.get_table(123).unwrap().iter()).count(), diff --git a/src/table_reader.rs b/src/table_reader.rs index b021f28..a2278d2 100644 --- a/src/table_reader.rs +++ b/src/table_reader.rs @@ -1,6 +1,6 @@ use crate::block::{Block, BlockIter}; use crate::blockhandle::BlockHandle; -use crate::cache; +use crate::cache::{self, Cache}; use crate::cmp::InternalKeyCmp; use crate::env::RandomAccess; use crate::error::{self, err, Result}; @@ -10,7 +10,7 @@ use crate::key_types::InternalKey; use crate::options::Options; use crate::table_block; use crate::table_builder::{self, Footer}; -use crate::types::{current_key_val, LdbIterator}; +use crate::types::{current_key_val, LdbIterator, Shared}; use std::cmp::Ordering; use std::rc::Rc; @@ -37,6 +37,7 @@ pub struct Table { cache_id: cache::CacheID, opt: Options, + block_cache: Shared>, footer: Footer, indexblock: Block, @@ -45,7 +46,12 @@ pub struct Table { impl Table { /// Creates a new table reader operating on unformatted keys (i.e., UserKey). - fn new_raw(opt: Options, file: Rc>, size: usize) -> Result
{ + fn new_raw( + opt: Options, + block_cache: Shared>, + file: Rc>, + size: usize, + ) -> Result
{ let footer = read_footer(file.as_ref().as_ref(), size)?; let indexblock = table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.index)?; @@ -54,13 +60,14 @@ impl Table { let filter_block_reader = Table::read_filter_block(&metaindexblock, file.as_ref().as_ref(), &opt)?; - let cache_id = opt.block_cache.borrow_mut().new_cache_id(); + let cache_id = block_cache.borrow_mut().new_cache_id(); Ok(Table { file, file_size: size, cache_id, opt, + block_cache, footer, filters: filter_block_reader, indexblock, @@ -105,12 +112,17 @@ impl Table { /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that /// a different comparator (internal_key_cmp) and a different filter policy /// (InternalFilterPolicy) are used. - pub fn new(mut opt: Options, file: Rc>, size: usize) -> Result
{ + pub fn new( + mut opt: Options, + block_cache: Shared>, + file: Rc>, + size: usize, + ) -> Result
{ opt.cmp = Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone()))); opt.filter_policy = Rc::new(Box::new(filter::InternalFilterPolicy::new( opt.filter_policy, ))); - Table::new_raw(opt, file, size) + Table::new_raw(opt, block_cache, file, size) } /// block_cache_handle creates a CacheKey for a block with a given offset to be used in the @@ -130,7 +142,7 @@ impl Table { /// cache. fn read_block(&self, location: &BlockHandle) -> Result { let cachekey = self.block_cache_handle(location.offset()); - if let Some(block) = self.opt.block_cache.borrow_mut().get(&cachekey) { + if let Some(block) = self.block_cache.borrow_mut().get(&cachekey) { return Ok(block.clone()); } @@ -139,10 +151,7 @@ impl Table { table_block::read_table_block(self.opt.clone(), self.file.as_ref().as_ref(), location)?; // insert a cheap copy (Rc). - self.opt - .block_cache - .borrow_mut() - .insert(&cachekey, b.clone()); + self.block_cache.borrow_mut().insert(&cachekey, b.clone()); Ok(b) } @@ -374,13 +383,15 @@ impl LdbIterator for TableIterator { #[cfg(test)] mod tests { + use std::cell::RefCell; + use crate::compressor::CompressorId; use crate::filter::BloomPolicy; use crate::key_types::LookupKey; use crate::table_builder::TableBuilder; use crate::test_util::{test_iterator_properties, LdbIteratorIter}; - use crate::types::{current_key_val, LdbIterator}; - use crate::{compressor, options}; + use crate::types::{current_key_val, share, LdbIterator}; + use crate::{block, compressor, options}; use super::*; @@ -464,8 +475,9 @@ mod tests { fn test_table_approximate_offset() { let (src, size) = build_table(build_data()); let mut opt = options::for_test(); + let bc = share(Cache::new(128)); opt.block_size = 32; - let table = Table::new_raw(opt, wrap_buffer(src), size).unwrap(); + let table = Table::new_raw(opt, bc, wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); let expected_offsets = [0, 0, 0, 44, 44, 44, 89]; @@ -480,30 +492,40 @@ mod tests { #[test] fn test_table_block_cache_use() { let (src, size) = build_table(build_data()); + let bc = share(Cache::new(128)); let mut opt = options::for_test(); opt.block_size = 32; - let table = Table::new_raw(opt.clone(), wrap_buffer(src), size).unwrap(); - let mut iter = table.iter(); + { + let table = Table::new_raw(opt.clone(), bc.clone(), wrap_buffer(src), size).unwrap(); + let mut iter = table.iter(); + + // index/metaindex blocks are not cached. That'd be a waste of memory. + assert_eq!(bc.borrow().count(), 0); + iter.next(); + assert_eq!(bc.borrow().count(), 1); + // This may fail if block parameters or data change. In that case, adapt it. + iter.next(); + iter.next(); + iter.next(); + iter.next(); + assert_eq!(bc.borrow().count(), 2); + } - // index/metaindex blocks are not cached. That'd be a waste of memory. - assert_eq!(opt.block_cache.borrow().count(), 0); - iter.next(); - assert_eq!(opt.block_cache.borrow().count(), 1); - // This may fail if block parameters or data change. In that case, adapt it. - iter.next(); - iter.next(); - iter.next(); - iter.next(); - assert_eq!(opt.block_cache.borrow().count(), 2); + println!( + "weak = {}, strong = {}", + std::rc::Rc::>>::weak_count(&bc), + std::rc::Rc::>>::strong_count(&bc) + ); } #[test] fn test_table_iterator_fwd_bwd() { let (src, size) = build_table(build_data()); let data = build_data(); + let bc = share(Cache::new(128)); - let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap(); + let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); let mut i = 0; @@ -552,8 +574,9 @@ mod tests { #[test] fn test_table_iterator_filter() { let (src, size) = build_table(build_data()); + let bc = share(Cache::new(128)); - let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap(); + let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap(); assert!(table.filters.is_some()); let filter_reader = table.filters.clone().unwrap(); let mut iter = table.iter(); @@ -566,8 +589,9 @@ mod tests { #[test] fn test_table_iterator_state_behavior() { let (src, size) = build_table(build_data()); + let bc = share(Cache::new(128)); - let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap(); + let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); // behavior test @@ -597,7 +621,8 @@ mod tests { let mut data = build_data(); data.truncate(4); let (src, size) = build_table(data); - let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap(); + let bc = share(Cache::new(128)); + let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap(); test_iterator_properties(table.iter()); } @@ -605,8 +630,9 @@ mod tests { fn test_table_iterator_values() { let (src, size) = build_table(build_data()); let data = build_data(); + let bc = share(Cache::new(128)); - let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap(); + let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); let mut i = 0; @@ -640,8 +666,9 @@ mod tests { #[test] fn test_table_iterator_seek() { let (src, size) = build_table(build_data()); + let bc = share(Cache::new(128)); - let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap(); + let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap(); let mut iter = table.iter(); iter.seek(b"bcd"); @@ -667,8 +694,10 @@ mod tests { #[test] fn test_table_get() { let (src, size) = build_table(build_data()); + let bc = share(Cache::new(128)); - let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap(); + let table = + Table::new_raw(options::for_test(), bc.clone(), wrap_buffer(src), size).unwrap(); let table2 = table.clone(); let mut _iter = table.iter(); @@ -678,7 +707,7 @@ mod tests { assert_eq!(Ok(Some((k, v))), r); } - assert_eq!(table.opt.block_cache.borrow().count(), 3); + assert_eq!(bc.borrow().count(), 3); // test that filters work and don't return anything at all. assert!(table.get(b"aaa").unwrap().is_none()); @@ -702,7 +731,8 @@ mod tests { let (src, size) = build_internal_table(); - let table = Table::new(options::for_test(), wrap_buffer(src), size).unwrap(); + let bc = share(Cache::new(128)); + let table = Table::new(options::for_test(), bc, wrap_buffer(src), size).unwrap(); let filter_reader = table.filters.clone().unwrap(); // Check that we're actually using internal keys @@ -730,11 +760,12 @@ mod tests { #[test] fn test_table_reader_checksum() { + let bc = share(Cache::new(128)); let (mut src, size) = build_table(build_data()); src[10] += 1; - let table = Table::new_raw(options::for_test(), wrap_buffer(src), size).unwrap(); + let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap(); assert!(table.filters.is_some()); assert_eq!(table.filters.as_ref().unwrap().num(), 1); diff --git a/src/version.rs b/src/version.rs index 4b81612..d306196 100644 --- a/src/version.rs +++ b/src/version.rs @@ -574,6 +574,7 @@ fn some_file_overlaps_range( #[cfg(test)] pub mod testutil { use super::*; + use crate::cache::Cache; use crate::cmp::DefaultCmp; use crate::env::Env; use crate::key_types::ValueType; @@ -708,7 +709,7 @@ pub mod testutil { ]; let t9 = write_table(env.as_ref().as_ref(), f9, 1, 9); - let cache = TableCache::new("db", opts.clone(), 100); + let cache = TableCache::new("db", opts.clone(), share(Cache::new(128)), 100); let mut v = Version::new(share(cache), Rc::new(Box::new(DefaultCmp))); v.files[0] = vec![t1, t2]; v.files[1] = vec![t3, t4, t5]; diff --git a/src/version_set.rs b/src/version_set.rs index cd59ccc..4a130bc 100644 --- a/src/version_set.rs +++ b/src/version_set.rs @@ -971,6 +971,7 @@ fn get_range<'a, C: Cmp, I: Iterator>( #[cfg(test)] mod tests { use super::*; + use crate::cache::Cache; use crate::cmp::DefaultCmp; use crate::key_types::LookupKey; use crate::test_util::LdbIteratorIter; @@ -1063,7 +1064,12 @@ mod tests { assert_eq!(1, b.added[1].len()); let mut v2 = Version::new( - share(TableCache::new("db", opt.clone(), 100)), + share(TableCache::new( + "db", + opt.clone(), + share(Cache::new(128)), + 100, + )), opt.cmp.clone(), ); b.save_to(&InternalKeyCmp(opt.cmp.clone()), &v, &mut v2); @@ -1080,7 +1086,12 @@ mod tests { let mut vs = VersionSet::new( "db", opt.clone(), - share(TableCache::new("db", opt.clone(), 100)), + share(TableCache::new( + "db", + opt.clone(), + share(Cache::new(128)), + 100, + )), ); assert_eq!(2, vs.new_file_number()); @@ -1149,7 +1160,11 @@ mod tests { #[test] fn test_version_set_utils() { let (v, opt) = make_version(); - let mut vs = VersionSet::new("db", opt.clone(), share(TableCache::new("db", opt, 100))); + let mut vs = VersionSet::new( + "db", + opt.clone(), + share(TableCache::new("db", opt, share(Cache::new(128)), 100)), + ); vs.add_version(v); // live_files() assert_eq!(9, vs.live_files().len()); @@ -1173,7 +1188,11 @@ mod tests { #[test] fn test_version_set_pick_compaction() { let (mut v, opt) = make_version(); - let mut vs = VersionSet::new("db", opt.clone(), share(TableCache::new("db", opt, 100))); + let mut vs = VersionSet::new( + "db", + opt.clone(), + share(TableCache::new("db", opt, share(Cache::new(128)), 100)), + ); v.compaction_score = Some(2.0); v.compaction_level = Some(0); @@ -1222,7 +1241,11 @@ mod tests { #[test] fn test_version_set_compaction() { let (v, opt) = make_version(); - let mut vs = VersionSet::new("db", opt.clone(), share(TableCache::new("db", opt, 100))); + let mut vs = VersionSet::new( + "db", + opt.clone(), + share(TableCache::new("db", opt, share(Cache::new(128)), 100)), + ); time_test!(); vs.add_version(v);