Skip to content

Commit

Permalink
Fix memory leak due to Options containing block cache
Browse files Browse the repository at this point in the history
Fixes #51
  • Loading branch information
dermesser committed Nov 12, 2024
1 parent 2b2e683 commit 028506e
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 92 deletions.
1 change: 0 additions & 1 deletion examples/kvserver/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

struct KVService {
db: rusty_leveldb::DB,
}
Expand Down
35 changes: 7 additions & 28 deletions src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,7 @@ mod tests {
fn get_data() -> Vec<(&'static [u8], &'static [u8])> {
vec![
(b"key1", b"value1"),
(
b"loooooooooooooooooooooooooooooooooongerkey1",
b"shrtvl1",
),
(b"loooooooooooooooooooooooooooooooooongerkey1", b"shrtvl1"),
("medium length key 1".as_bytes(), "some value 2".as_bytes()),
(b"prefix_key1", b"value"),
(b"prefix_key2", b"value"),
Expand Down Expand Up @@ -401,10 +398,7 @@ mod tests {
let mut block = Block::new(o.clone(), block_contents).iter();

assert!(!block.valid());
assert_eq!(
block.next(),
Some((b"key1".to_vec(), b"value1".to_vec()))
);
assert_eq!(block.next(), Some((b"key1".to_vec(), b"value1".to_vec())));
assert!(block.valid());
block.next();
assert!(block.valid());
Expand All @@ -425,10 +419,7 @@ mod tests {
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
b"prefix_key2".to_vec(),
b"value".to_vec()
))
Some((b"prefix_key2".to_vec(), b"value".to_vec()))
);
}

Expand All @@ -452,20 +443,14 @@ mod tests {
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
b"prefix_key2".to_vec(),
b"value".to_vec()
))
Some((b"prefix_key2".to_vec(), b"value".to_vec()))
);

block.seek(b"prefix_key0");
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
b"prefix_key1".to_vec(),
b"value".to_vec()
))
Some((b"prefix_key1".to_vec(), b"value".to_vec()))
);

block.seek(b"key1");
Expand All @@ -479,10 +464,7 @@ mod tests {
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
b"prefix_key3".to_vec(),
b"value".to_vec()
))
Some((b"prefix_key3".to_vec(), b"value".to_vec()))
);

block.seek(b"prefix_key8");
Expand Down Expand Up @@ -513,10 +495,7 @@ mod tests {
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
b"prefix_key3".to_vec(),
b"value".to_vec()
))
Some((b"prefix_key3".to_vec(), b"value".to_vec()))
);
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/db_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#![allow(unused_attributes)]

use crate::block::Block;

Check warning on line 6 in src/db_impl.rs

View workflow job for this annotation

GitHub Actions / leveldb-rs (macos-latest)

unused import: `crate::block::Block`

Check warning on line 6 in src/db_impl.rs

View workflow job for this annotation

GitHub Actions / leveldb-rs (ubuntu-latest)

unused import: `crate::block::Block`

Check warning on line 6 in src/db_impl.rs

View workflow job for this annotation

GitHub Actions / leveldb-rs (async, macos-latest)

unused import: `crate::block::Block`

Check warning on line 6 in src/db_impl.rs

View workflow job for this annotation

GitHub Actions / leveldb-rs (async, ubuntu-latest)

unused import: `crate::block::Block`

Check warning on line 6 in src/db_impl.rs

View workflow job for this annotation

GitHub Actions / leveldb-rs (windows-latest)

unused import: `crate::block::Block`

Check warning on line 6 in src/db_impl.rs

View workflow job for this annotation

GitHub Actions / leveldb-rs (async, windows-latest)

unused import: `crate::block::Block`
use crate::cache::Cache;
use crate::db_iter::DBIterator;

use crate::cmp::{Cmp, InternalKeyCmp};
Expand Down Expand Up @@ -74,7 +76,12 @@ impl DB {
}
let path = name.canonicalize().unwrap_or(name.to_owned());

let cache = share(TableCache::new(name, opt.clone(), opt.max_open_files - 10));
let cache = share(TableCache::new(
name,
opt.clone(),
share(Cache::new(opt.block_cache_capacity_bytes / opt.block_size)),
opt.max_open_files - 10,
));
let vset = VersionSet::new(name, opt.clone(), cache.clone());

DB {
Expand Down Expand Up @@ -1478,7 +1485,7 @@ mod tests {

{
// Read table back in.
let mut tc = TableCache::new("db", opt.clone(), 100);
let mut tc = TableCache::new("db", opt.clone(), share(Cache::new(128)), 100);
let tbl = tc.get_table(123).unwrap();
assert_eq!(mt.len(), LdbIteratorIter::wrap(&mut tbl.iter()).count());
}
Expand All @@ -1498,7 +1505,7 @@ mod tests {
.write_all(&buf)
.unwrap();

let mut tc = TableCache::new("db", opt.clone(), 100);
let mut tc = TableCache::new("db", opt.clone(), share(Cache::new(128)), 100);
let tbl = tc.get_table(123).unwrap();
// The last two entries are skipped due to the corruption above.
assert_eq!(
Expand Down
11 changes: 2 additions & 9 deletions src/filter_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,7 @@ mod tests {
}

fn get_keys() -> Vec<&'static [u8]> {
vec![
b"abcd",
b"efgh",
b"ijkl",
b"mnopqrstuvwxyz",
]
vec![b"abcd", b"efgh", b"ijkl", b"mnopqrstuvwxyz"]
}

fn produce_filter_block() -> Vec<u8> {
Expand Down Expand Up @@ -229,9 +224,7 @@ mod tests {
17
); // third block in third filter

let unknown_keys: [&[u8]; 3] = [b"xsb",
b"9sad",
b"assssaaaass"];
let unknown_keys: [&[u8]; 3] = [b"xsb", b"9sad", b"assssaaaass"];

for block_offset in vec![0, 1024, 5000, 6025].into_iter() {
for key in get_keys().iter() {
Expand Down
7 changes: 2 additions & 5 deletions src/options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::block::Block;
use crate::cache::Cache;
use crate::cmp::{Cmp, DefaultCmp};
use crate::compressor::{self, Compressor, CompressorId};
use crate::env::Env;
Expand Down Expand Up @@ -33,7 +31,7 @@ pub struct Options {
pub write_buffer_size: usize,
pub max_open_files: usize,
pub max_file_size: usize,
pub block_cache: Shared<Cache<Block>>,
pub block_cache_capacity_bytes: usize,
pub block_size: usize,
pub block_restart_interval: usize,
/// Compressor id in compressor list
Expand Down Expand Up @@ -66,8 +64,7 @@ impl Default for Options {
write_buffer_size: WRITE_BUFFER_SIZE,
max_open_files: 1 << 10,
max_file_size: 2 << 20,
// 2000 elements by default
block_cache: share(Cache::new(BLOCK_CACHE_CAPACITY / BLOCK_MAX_SIZE)),
block_cache_capacity_bytes: BLOCK_MAX_SIZE * 1024,
block_size: BLOCK_MAX_SIZE,
block_restart_interval: 16,
reuse_logs: true,
Expand Down
2 changes: 2 additions & 0 deletions src/skipmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ impl SkipMap {
pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn approx_memory(&self) -> usize {
self.map.borrow().approx_mem
}

pub fn contains(&self, key: &[u8]) -> bool {
self.map.borrow().contains(key)
}
Expand Down
18 changes: 14 additions & 4 deletions src/table_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
//! read-through cache, meaning that non-present tables are read from disk and cached before being
//! returned.
use crate::block::Block;
use crate::cache::{self, Cache};
use crate::error::{err, Result, StatusCode};
use crate::key_types::InternalKey;
use crate::options::Options;
use crate::table_reader::Table;
use crate::types::FileNum;
use crate::types::{FileNum, Shared};

use integer_encoding::FixedIntWriter;

Expand All @@ -29,17 +30,24 @@ fn filenum_to_key(num: FileNum) -> cache::CacheKey {
pub struct TableCache {
dbname: PathBuf,
cache: Cache<Table>,
block_cache: Shared<Cache<Block>>,
opts: Options,
}

impl TableCache {
/// Create a new TableCache for the database named `db`, caching up to `entries` tables.
///
/// opt.cmp should be the user-supplied comparator.
pub fn new<P: AsRef<Path>>(db: P, opt: Options, entries: usize) -> TableCache {
pub fn new<P: AsRef<Path>>(
db: P,
opt: Options,
block_cache: Shared<Cache<Block>>,
entries: usize,
) -> TableCache {
TableCache {
dbname: db.as_ref().to_owned(),
cache: Cache::new(entries),
block_cache,
opts: opt,
}
}
Expand Down Expand Up @@ -72,7 +80,7 @@ impl TableCache {
}
let file = Rc::new(self.opts.env.open_random_access_file(path)?);
// No SSTable file name compatibility.
let table = Table::new(self.opts.clone(), file, file_size)?;
let table = Table::new(self.opts.clone(), self.block_cache.clone(), file, file_size)?;
self.cache.insert(&filenum_to_key(file_num), table.clone());
Ok(table)
}
Expand All @@ -94,6 +102,7 @@ mod tests {
use crate::options;
use crate::table_builder::TableBuilder;
use crate::test_util::LdbIteratorIter;
use crate::types::share;

#[test]
fn test_table_file_name() {
Expand Down Expand Up @@ -138,6 +147,7 @@ mod tests {
// parsed/iterated by the table reader.
let mut opt = options::for_test();
opt.env = Rc::new(Box::new(MemEnv::new()));
let bc = share(Cache::new(128));
let dbname = Path::new("testdb1");
let tablename = table_file_name(dbname, 123);
let tblpath = Path::new(&tablename);
Expand All @@ -146,7 +156,7 @@ mod tests {
assert!(opt.env.exists(tblpath).unwrap());
assert!(opt.env.size_of(tblpath).unwrap() > 20);

let mut cache = TableCache::new(dbname, opt.clone(), 10);
let mut cache = TableCache::new(dbname, opt.clone(), bc, 10);
assert!(cache.cache.get(&filenum_to_key(123)).is_none());
assert_eq!(
LdbIteratorIter::wrap(&mut cache.get_table(123).unwrap().iter()).count(),
Expand Down
Loading

0 comments on commit 028506e

Please sign in to comment.