From 80a61d53fa8d707a4606de727bff504265896cb5 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Sat, 11 Jan 2025 23:46:37 +0000 Subject: [PATCH] refactor: address PR feedback --- influxdb3/src/commands/serve.rs | 11 +++--- influxdb3_processing_engine/src/lib.rs | 2 +- influxdb3_server/src/lib.rs | 2 +- influxdb3_server/src/query_executor/mod.rs | 2 +- influxdb3_wal/src/object_store.rs | 41 ++++++++-------------- influxdb3_write/src/write_buffer/mod.rs | 16 ++++----- 6 files changed, 32 insertions(+), 42 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 7dfeaea4bf9..abfdc8f7979 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -208,15 +208,16 @@ pub struct Config { )] pub wal_max_write_buffer_size: usize, - /// Number of wal files to keep behind, wal flush does not clear the wal files immediately - /// instead they are only deleted when num wal files count exceed this keep behind size + /// Number of snapshotted wal files to retain in object store, wal flush does not clear + /// the wal files immediately instead they are only deleted when snapshotted and num wal files + /// count exceeds this size #[clap( - long = "num-wal-files-to-keep", + long = "snapshotted-wal-files-to-keep", env = "INFLUXDB3_NUM_WAL_FILES_TO_KEEP", default_value = "300", action )] - pub num_wal_files_to_keep: u64, + pub snapshotted_wal_files_to_keep: u64, // TODO - tune this default: /// The size of the query log. Up to this many queries will remain in the log before @@ -512,7 +513,7 @@ pub async fn command(config: Config) -> Result<()> { wal_config, parquet_cache, metric_registry: Arc::clone(&metrics), - num_wal_files_to_keep: config.num_wal_files_to_keep, + snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep, }) .await .map_err(|e| Error::WriteBufferInit(e.into()))?; diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 3c6c1c62053..6ba048a2779 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -865,7 +865,7 @@ mod tests { wal_config, parquet_cache: None, metric_registry: Arc::clone(&metric_registry), - num_wal_files_to_keep: 10, + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index a688546d035..2727bd2c531 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -778,7 +778,7 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(parquet_cache), metric_registry: Arc::clone(&metrics), - num_wal_files_to_keep: 100, + snapshotted_wal_files_to_keep: 100, }, ) .await diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index ee4b54a75bb..a248e79805a 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -713,7 +713,7 @@ mod tests { }, parquet_cache: Some(parquet_cache), metric_registry: Default::default(), - num_wal_files_to_keep: 1, + snapshotted_wal_files_to_keep: 1, }) .await .unwrap(); diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 3430583d90c..5769733ba39 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -17,17 +17,6 @@ use std::{str::FromStr, sync::Arc}; use tokio::sync::Mutex; use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore}; -// struct WalObjectStoreArgs { -// time_provider: Arc, -// object_store: Arc, -// host_identifier_prefix: String, -// file_notifier: Arc, -// config: WalConfig, -// last_wal_sequence_number: Option, -// last_snapshot_sequence_number: Option, -// all_wal_file_paths: &[Path], -// } - #[derive(Debug)] pub struct WalObjectStore { object_store: Arc, @@ -36,16 +25,15 @@ pub struct WalObjectStore { added_file_notifiers: parking_lot::Mutex>>, /// Buffered wal ops go in here along with the state to track when to snapshot flush_buffer: Mutex, - /// oldest, last and latest wal file nums are used to keep track of what wal files - /// to remove from the OS - num_wal_files_to_keep: u64, - // we need atomics or mutex + /// number of snapshotted wal files to retain in object store + snapshotted_wal_files_to_keep: u64, wal_remover: parking_lot::Mutex, } impl WalObjectStore { /// Creates a new WAL. This will replay files into the notifier and trigger any snapshots that /// exist in the WAL files that haven't been cleaned up yet. + #[allow(clippy::too_many_arguments)] pub async fn new( time_provider: Arc, object_store: Arc, @@ -54,7 +42,7 @@ impl WalObjectStore { config: WalConfig, last_wal_sequence_number: Option, last_snapshot_sequence_number: Option, - num_wal_files_to_keep: u64, + snapshotted_wal_files_to_keep: u64, ) -> Result, crate::Error> { let host_identifier = host_identifier_prefix.into(); let all_wal_file_paths = @@ -69,7 +57,7 @@ impl WalObjectStore { last_wal_sequence_number, last_snapshot_sequence_number, &all_wal_file_paths, - num_wal_files_to_keep, + snapshotted_wal_files_to_keep, ); wal.replay(last_wal_sequence_number, &all_wal_file_paths) @@ -80,6 +68,7 @@ impl WalObjectStore { Ok(wal) } + #[allow(clippy::too_many_arguments)] fn new_without_replay( time_provider: Arc, object_store: Arc, @@ -118,10 +107,10 @@ impl WalObjectStore { last_snapshot_sequence_number, ), )), - num_wal_files_to_keep, + snapshotted_wal_files_to_keep: num_wal_files_to_keep, wal_remover: parking_lot::Mutex::new(WalFileRemover { oldest_wal_file: oldest_wal_file_num, - last_wal_sequence_number, + last_snapshotted_wal_sequence_number: last_wal_sequence_number, }), } } @@ -415,12 +404,12 @@ impl WalObjectStore { ">>> checking num wal files to delete" ); - if curr_num_files > self.num_wal_files_to_keep { - let num_files_to_delete = curr_num_files - self.num_wal_files_to_keep; + if curr_num_files > self.snapshotted_wal_files_to_keep { + let num_files_to_delete = curr_num_files - self.snapshotted_wal_files_to_keep; let last_to_delete = oldest + num_files_to_delete; debug!( - num_files_to_keep = ?self.num_wal_files_to_keep, + num_files_to_keep = ?self.snapshotted_wal_files_to_keep, ?curr_num_files, ?num_files_to_delete, ?last_to_delete, @@ -839,26 +828,26 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber { #[derive(Debug)] struct WalFileRemover { oldest_wal_file: Option, - last_wal_sequence_number: Option, + last_snapshotted_wal_sequence_number: Option, } impl WalFileRemover { fn get_current_state(&self) -> (u64, u64) { ( self.oldest_wal_file.map(|num| num.as_u64()).unwrap_or(0), - self.last_wal_sequence_number + self.last_snapshotted_wal_sequence_number .map(|num| num.as_u64()) .unwrap_or(0), ) } fn update_last_wal_num(&mut self, last_wal: WalFileSequenceNumber) { - self.last_wal_sequence_number.replace(last_wal); + self.last_snapshotted_wal_sequence_number.replace(last_wal); } fn update_state(&mut self, oldest: u64, last: u64) { self.oldest_wal_file = Some(WalFileSequenceNumber::new(oldest)); - self.last_wal_sequence_number = Some(WalFileSequenceNumber::new(last)); + self.last_snapshotted_wal_sequence_number = Some(WalFileSequenceNumber::new(last)); } } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 4d45bc7a165..7e7cb007392 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -162,7 +162,7 @@ pub struct WriteBufferImplArgs { pub wal_config: WalConfig, pub parquet_cache: Option>, pub metric_registry: Arc, - pub num_wal_files_to_keep: u64, + pub snapshotted_wal_files_to_keep: u64, } impl WriteBufferImpl { @@ -177,7 +177,7 @@ impl WriteBufferImpl { wal_config, parquet_cache, metric_registry, - num_wal_files_to_keep, + snapshotted_wal_files_to_keep, }: WriteBufferImplArgs, ) -> Result> { // load snapshots and replay the wal into the in memory buffer @@ -221,7 +221,7 @@ impl WriteBufferImpl { wal_config, last_wal_sequence_number, last_snapshot_sequence_number, - num_wal_files_to_keep, + snapshotted_wal_files_to_keep, ) .await?; @@ -936,7 +936,7 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), - num_wal_files_to_keep: 10, + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); @@ -1023,7 +1023,7 @@ mod tests { }, parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), - num_wal_files_to_keep: 10, + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); @@ -1092,7 +1092,7 @@ mod tests { }, parquet_cache: wbuf.parquet_cache.clone(), metric_registry: Default::default(), - num_wal_files_to_keep: 10, + snapshotted_wal_files_to_keep: 10, }) .await .unwrap() @@ -1322,7 +1322,7 @@ mod tests { }, parquet_cache: write_buffer.parquet_cache.clone(), metric_registry: Default::default(), - num_wal_files_to_keep: 10, + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); @@ -2610,7 +2610,7 @@ mod tests { wal_config, parquet_cache, metric_registry: Arc::clone(&metric_registry), - num_wal_files_to_keep: 10, + snapshotted_wal_files_to_keep: 10, }) .await .unwrap();