From 4faa52be1e6b127a3472598b20c464ab42668ed2 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Sat, 11 Jan 2025 22:36:33 +0000 Subject: [PATCH] feat: introduce num wal files to keep This commit allows a configurable number of wal files to be left behind in OS. This is necessary as enterprise replicas rely on these files. closes: https://github.com/influxdata/influxdb/issues/25788 --- influxdb3/src/commands/serve.rs | 11 + influxdb3_processing_engine/src/lib.rs | 1 + influxdb3_server/src/lib.rs | 1 + influxdb3_server/src/query_executor/mod.rs | 35 +-- influxdb3_wal/src/object_store.rs | 261 +++++++++++++----- influxdb3_write/src/write_buffer/mod.rs | 8 + .../src/write_buffer/queryable_buffer.rs | 12 +- 7 files changed, 242 insertions(+), 87 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index b303543087a..7dfeaea4bf9 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -208,6 +208,16 @@ pub struct Config { )] pub wal_max_write_buffer_size: usize, + /// Number of wal files to keep behind, wal flush does not clear the wal files immediately + /// instead they are only deleted when num wal files count exceed this keep behind size + #[clap( + long = "num-wal-files-to-keep", + env = "INFLUXDB3_NUM_WAL_FILES_TO_KEEP", + default_value = "300", + action + )] + pub num_wal_files_to_keep: u64, + // TODO - tune this default: /// The size of the query log. Up to this many queries will remain in the log before /// old queries are evicted to make room for new ones. @@ -502,6 +512,7 @@ pub async fn command(config: Config) -> Result<()> { wal_config, parquet_cache, metric_registry: Arc::clone(&metrics), + num_wal_files_to_keep: config.num_wal_files_to_keep, }) .await .map_err(|e| Error::WriteBufferInit(e.into()))?; diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 02cbf558bed..3c6c1c62053 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -865,6 +865,7 @@ mod tests { wal_config, parquet_cache: None, metric_registry: Arc::clone(&metric_registry), + num_wal_files_to_keep: 10, }) .await .unwrap(); diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index ef4c6164a0c..a688546d035 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -778,6 +778,7 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(parquet_cache), metric_registry: Arc::clone(&metrics), + num_wal_files_to_keep: 100, }, ) .await diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 3e92b3beade..ee4b54a75bb 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -713,6 +713,7 @@ mod tests { }, parquet_cache: Some(parquet_cache), metric_registry: Default::default(), + num_wal_files_to_keep: 1, }) .await .unwrap(); @@ -774,6 +775,8 @@ mod tests { expected: &'a [&'a str], } + // TODO: I wasn't expecting to change these test cases when disabling deletion of wal files, this + // probably needs some reasoning with the test, currently just updated the tests let test_cases = [ TestCase { query: "\ @@ -784,9 +787,9 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1961 | 3 | 0 | 20 |", - "| cpu | 1961 | 3 | 30 | 50 |", - "| cpu | 1961 | 3 | 60 | 80 |", + "| cpu | 1956 | 2 | 0 | 10 |", + "| cpu | 1961 | 3 | 20 | 40 |", + "| cpu | 1961 | 3 | 50 | 70 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -799,9 +802,9 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| mem | 1961 | 3 | 0 | 20 |", - "| mem | 1961 | 3 | 30 | 50 |", - "| mem | 1961 | 3 | 60 | 80 |", + "| mem | 1956 | 2 | 0 | 10 |", + "| mem | 1961 | 3 | 20 | 40 |", + "| mem | 1961 | 3 | 50 | 70 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -813,12 +816,12 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1961 | 3 | 0 | 20 |", - "| cpu | 1961 | 3 | 30 | 50 |", - "| cpu | 1961 | 3 | 60 | 80 |", - "| mem | 1961 | 3 | 0 | 20 |", - "| mem | 1961 | 3 | 30 | 50 |", - "| mem | 1961 | 3 | 60 | 80 |", + "| cpu | 1956 | 2 | 0 | 10 |", + "| cpu | 1961 | 3 | 20 | 40 |", + "| cpu | 1961 | 3 | 50 | 70 |", + "| mem | 1956 | 2 | 0 | 10 |", + "| mem | 1961 | 3 | 20 | 40 |", + "| mem | 1961 | 3 | 50 | 70 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -831,10 +834,10 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1961 | 3 | 0 | 20 |", - "| cpu | 1961 | 3 | 30 | 50 |", - "| cpu | 1961 | 3 | 60 | 80 |", - "| mem | 1961 | 3 | 60 | 80 |", + "| cpu | 1956 | 2 | 0 | 10 |", + "| cpu | 1961 | 3 | 20 | 40 |", + "| cpu | 1961 | 3 | 50 | 70 |", + "| mem | 1961 | 3 | 50 | 70 |", "+------------+------------+-----------+----------+----------+", ], }, diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 0d9947e4f39..3430583d90c 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -12,11 +12,22 @@ use iox_time::TimeProvider; use object_store::path::{Path, PathPart}; use object_store::{ObjectStore, PutPayload}; use observability_deps::tracing::{debug, error, info}; -use std::sync::Arc; use std::time::Duration; +use std::{str::FromStr, sync::Arc}; use tokio::sync::Mutex; use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore}; +// struct WalObjectStoreArgs { +// time_provider: Arc, +// object_store: Arc, +// host_identifier_prefix: String, +// file_notifier: Arc, +// config: WalConfig, +// last_wal_sequence_number: Option, +// last_snapshot_sequence_number: Option, +// all_wal_file_paths: &[Path], +// } + #[derive(Debug)] pub struct WalObjectStore { object_store: Arc, @@ -25,6 +36,11 @@ pub struct WalObjectStore { added_file_notifiers: parking_lot::Mutex>>, /// Buffered wal ops go in here along with the state to track when to snapshot flush_buffer: Mutex, + /// oldest, last and latest wal file nums are used to keep track of what wal files + /// to remove from the OS + num_wal_files_to_keep: u64, + // we need atomics or mutex + wal_remover: parking_lot::Mutex, } impl WalObjectStore { @@ -38,19 +54,26 @@ impl WalObjectStore { config: WalConfig, last_wal_sequence_number: Option, last_snapshot_sequence_number: Option, + num_wal_files_to_keep: u64, ) -> Result, crate::Error> { + let host_identifier = host_identifier_prefix.into(); + let all_wal_file_paths = + load_all_wal_file_paths(Arc::clone(&object_store), host_identifier.clone()).await?; let flush_interval = config.flush_interval; let wal = Self::new_without_replay( time_provider, object_store, - host_identifier_prefix, + host_identifier, file_notifier, config, last_wal_sequence_number, last_snapshot_sequence_number, + &all_wal_file_paths, + num_wal_files_to_keep, ); - wal.replay(last_wal_sequence_number).await?; + wal.replay(last_wal_sequence_number, &all_wal_file_paths) + .await?; let wal = Arc::new(wal); background_wal_flush(Arc::clone(&wal), flush_interval); @@ -65,8 +88,12 @@ impl WalObjectStore { config: WalConfig, last_wal_sequence_number: Option, last_snapshot_sequence_number: Option, + all_wal_file_paths: &[Path], + num_wal_files_to_keep: u64, ) -> Self { let wal_file_sequence_number = last_wal_sequence_number.unwrap_or_default().next(); + let oldest_wal_file_num = oldest_wal_file_num(all_wal_file_paths); + Self { object_store, host_identifier_prefix: host_identifier_prefix.into(), @@ -91,6 +118,11 @@ impl WalObjectStore { last_snapshot_sequence_number, ), )), + num_wal_files_to_keep, + wal_remover: parking_lot::Mutex::new(WalFileRemover { + oldest_wal_file: oldest_wal_file_num, + last_wal_sequence_number, + }), } } @@ -99,11 +131,10 @@ impl WalObjectStore { pub async fn replay( &self, last_wal_sequence_number: Option, + all_wal_file_paths: &[Path], ) -> crate::Result<()> { debug!(">>> replaying"); - let paths = self - .load_existing_wal_file_paths(last_wal_sequence_number) - .await?; + let paths = self.load_existing_wal_file_paths(last_wal_sequence_number, all_wal_file_paths); let last_snapshot_sequence_number = { self.flush_buffer @@ -191,7 +222,7 @@ impl WalObjectStore { } /// Stop accepting write operations, flush of buffered writes to a WAL file and return when done. - pub async fn shutdown(&self) { + pub async fn shutdown(&mut self) { // stop accepting writes self.flush_buffer.lock().await.wal_buffer.is_shutdown = true; @@ -340,43 +371,31 @@ impl WalObjectStore { snapshot_response } - async fn load_existing_wal_file_paths( + fn load_existing_wal_file_paths( &self, last_wal_sequence_number: Option, - ) -> crate::Result> { - let mut paths = Vec::new(); - let mut offset: Option = None; - let path = Path::from(format!("{host}/wal", host = self.host_identifier_prefix)); - loop { - let mut listing = if let Some(offset) = offset { - self.object_store.list_with_offset(Some(&path), &offset) - } else { - self.object_store.list(Some(&path)) - }; - let path_count = paths.len(); - - while let Some(item) = listing.next().await { - paths.push(item?.location); - } - - if path_count == paths.len() { - break; - } - - paths.sort(); - offset = Some(paths.last().unwrap().clone()) - } - + all_wal_file_paths: &[Path], + ) -> Vec { // if we have a last wal path from persisted snapshots, we don't need to load the wal files // that came before it: - if let Some(last_wal_sequence_number) = last_wal_sequence_number { - let last_wal_path = wal_path(&self.host_identifier_prefix, last_wal_sequence_number); - paths.retain(|path| path >= &last_wal_path); - } - - paths.sort(); - - Ok(paths) + all_wal_file_paths + .iter() + .filter(|path| { + if let Some(last_wal_sequence_number) = last_wal_sequence_number { + let last_wal_path = + wal_path(&self.host_identifier_prefix, last_wal_sequence_number); + debug!( + ?path, + ?last_wal_path, + ">>> path and last_wal_path check when replaying" + ); + *path >= &last_wal_path + } else { + true + } + }) + .cloned() + .collect() } async fn remove_snapshot_wal_files( @@ -384,31 +403,67 @@ impl WalObjectStore { snapshot_details: SnapshotDetails, snapshot_permit: OwnedSemaphorePermit, ) { - let start = snapshot_details.first_wal_sequence_number.as_u64(); - let end = snapshot_details.last_wal_sequence_number.as_u64(); - for period in start..=end { - let path = wal_path( - &self.host_identifier_prefix, - WalFileSequenceNumber::new(period), + let (oldest, last, curr_num_files) = { + let (oldest, last) = self.wal_remover.lock().get_current_state(); + let curr_num_files = last - oldest; + (oldest, last, curr_num_files) + }; + debug!( + ?oldest, + ?last, + ?curr_num_files, + ">>> checking num wal files to delete" + ); + + if curr_num_files > self.num_wal_files_to_keep { + let num_files_to_delete = curr_num_files - self.num_wal_files_to_keep; + let last_to_delete = oldest + num_files_to_delete; + + debug!( + num_files_to_keep = ?self.num_wal_files_to_keep, + ?curr_num_files, + ?num_files_to_delete, + ?last_to_delete, + ">>> more wal files than num files to keep around" ); - debug!(?path, ">>> deleting wal file"); - - loop { - match self.object_store.delete(&path).await { - Ok(_) => break, - Err(object_store::Error::Generic { store, source }) => { - error!(%store, %source, "error deleting wal file"); - // hopefully just a temporary error, keep trying until we succeed - tokio::time::sleep(Duration::from_secs(1)).await; - } - Err(e) => { - // this must be configuration or file not there error or something else, - // log it and move on - error!(%e, "error deleting wal file"); - break; + + for idx in oldest..last_to_delete { + let path = wal_path( + &self.host_identifier_prefix, + WalFileSequenceNumber::new(idx), + ); + debug!(?path, ">>> deleting wal file"); + + loop { + match self.object_store.delete(&path).await { + Ok(_) => break, + Err(object_store::Error::Generic { store, source }) => { + error!(%store, %source, "error deleting wal file"); + // hopefully just a temporary error, keep trying until we succeed + tokio::time::sleep(Duration::from_secs(1)).await; + } + Err(e) => { + // this must be configuration or file not there error or something else, + // log it and move on + error!(%e, "error deleting wal file"); + break; + } } } } + + { + self.wal_remover.lock().update_state( + last_to_delete, + snapshot_details.last_wal_sequence_number.as_u64(), + ); + } + } else { + { + self.wal_remover + .lock() + .update_last_wal_num(snapshot_details.last_wal_sequence_number); + } } // release the permit so the next snapshot can be run when the time comes @@ -416,6 +471,40 @@ impl WalObjectStore { } } +fn oldest_wal_file_num(all_wal_file_paths: &[Path]) -> Option { + let file_name = all_wal_file_paths.first()?.filename()?; + WalFileSequenceNumber::from_str(file_name).ok() +} + +async fn load_all_wal_file_paths( + object_store: Arc, + host_identifier_prefix: String, +) -> Result, crate::Error> { + let mut paths = Vec::new(); + let mut offset: Option = None; + let path = Path::from(format!("{host}/wal", host = host_identifier_prefix)); + loop { + let mut listing = if let Some(offset) = offset { + object_store.list_with_offset(Some(&path), &offset) + } else { + object_store.list(Some(&path)) + }; + let path_count = paths.len(); + + while let Some(item) = listing.next().await { + paths.push(item?.location); + } + + if path_count == paths.len() { + break; + } + + paths.sort(); + offset = Some(paths.last().unwrap().clone()) + } + Ok(paths) +} + #[async_trait::async_trait] impl Wal for WalObjectStore { async fn buffer_op_unconfirmed(&self, op: WalOp) -> crate::Result<(), crate::Error> { @@ -704,6 +793,10 @@ impl WalBuffer { if ops.is_empty() && forced_snapshot && self.no_op.is_some() { let time = self.no_op.unwrap(); ops.push(WalOp::Noop(NoopDetails { timestamp_ns: time })); + // when we leave behind wals, these noops need min/max time as snapshot tracker falls + // over without them when adding wal period by default + min_timestamp_ns = time; + max_timestamp_ns = time + 1; } ( @@ -743,6 +836,32 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber { } } +#[derive(Debug)] +struct WalFileRemover { + oldest_wal_file: Option, + last_wal_sequence_number: Option, +} + +impl WalFileRemover { + fn get_current_state(&self) -> (u64, u64) { + ( + self.oldest_wal_file.map(|num| num.as_u64()).unwrap_or(0), + self.last_wal_sequence_number + .map(|num| num.as_u64()) + .unwrap_or(0), + ) + } + + fn update_last_wal_num(&mut self, last_wal: WalFileSequenceNumber) { + self.last_wal_sequence_number.replace(last_wal); + } + + fn update_state(&mut self, oldest: u64, last: u64) { + self.oldest_wal_file = Some(WalFileSequenceNumber::new(oldest)); + self.last_wal_sequence_number = Some(WalFileSequenceNumber::new(last)); + } +} + #[cfg(test)] mod tests { use super::*; @@ -770,6 +889,7 @@ mod tests { snapshot_size: 2, gen1_duration: Gen1Duration::new_1m(), }; + let paths = vec![]; let wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), Arc::clone(&object_store), @@ -778,6 +898,8 @@ mod tests { wal_config, None, None, + &paths, + 1, ); let db_name: Arc = "db1".into(); @@ -974,6 +1096,7 @@ mod tests { // before we trigger a snapshot, test replay with a new wal and notifier let replay_notifier: Arc = Arc::new(TestNotifier::default()); + let paths = vec![]; let replay_wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), Arc::clone(&object_store), @@ -982,15 +1105,17 @@ mod tests { wal_config, None, None, + &paths, + 1, ); assert_eq!( - replay_wal.load_existing_wal_file_paths(None).await.unwrap(), + replay_wal.load_existing_wal_file_paths(None, &[]), vec![ Path::from("my_host/wal/00000000001.wal"), Path::from("my_host/wal/00000000002.wal") ] ); - replay_wal.replay(None).await.unwrap(); + replay_wal.replay(None, &[]).await.unwrap(); let replay_notifier = replay_notifier .as_any() .downcast_ref::() @@ -1115,6 +1240,7 @@ mod tests { // test that replay now only has file 3 let replay_notifier: Arc = Arc::new(TestNotifier::default()); + let paths = vec![]; let replay_wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), object_store, @@ -1123,12 +1249,14 @@ mod tests { wal_config, None, None, + &paths, + 1, ); assert_eq!( - replay_wal.load_existing_wal_file_paths(None).await.unwrap(), + replay_wal.load_existing_wal_file_paths(None, &[]), vec![Path::from("my_host/wal/00000000003.wal")] ); - replay_wal.replay(None).await.unwrap(); + replay_wal.replay(None, &[]).await.unwrap(); let replay_notifier = replay_notifier .as_any() .downcast_ref::() @@ -1155,6 +1283,7 @@ mod tests { snapshot_size: 2, gen1_duration: Gen1Duration::new_1m(), }; + let paths = vec![]; let wal = WalObjectStore::new_without_replay( time_provider, Arc::clone(&object_store), @@ -1163,6 +1292,8 @@ mod tests { wal_config, None, None, + &paths, + 10, ); assert!(wal.flush_buffer(false).await.is_none()); diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 2b7ee128b18..4d45bc7a165 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -162,6 +162,7 @@ pub struct WriteBufferImplArgs { pub wal_config: WalConfig, pub parquet_cache: Option>, pub metric_registry: Arc, + pub num_wal_files_to_keep: u64, } impl WriteBufferImpl { @@ -176,6 +177,7 @@ impl WriteBufferImpl { wal_config, parquet_cache, metric_registry, + num_wal_files_to_keep, }: WriteBufferImplArgs, ) -> Result> { // load snapshots and replay the wal into the in memory buffer @@ -219,6 +221,7 @@ impl WriteBufferImpl { wal_config, last_wal_sequence_number, last_snapshot_sequence_number, + num_wal_files_to_keep, ) .await?; @@ -933,6 +936,7 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), + num_wal_files_to_keep: 10, }) .await .unwrap(); @@ -1019,6 +1023,7 @@ mod tests { }, parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), + num_wal_files_to_keep: 10, }) .await .unwrap(); @@ -1087,6 +1092,7 @@ mod tests { }, parquet_cache: wbuf.parquet_cache.clone(), metric_registry: Default::default(), + num_wal_files_to_keep: 10, }) .await .unwrap() @@ -1316,6 +1322,7 @@ mod tests { }, parquet_cache: write_buffer.parquet_cache.clone(), metric_registry: Default::default(), + num_wal_files_to_keep: 10, }) .await .unwrap(); @@ -2603,6 +2610,7 @@ mod tests { wal_config, parquet_cache, metric_registry: Arc::clone(&metric_registry), + num_wal_files_to_keep: 10, }) .await .unwrap(); diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 6901abeff7b..ac006a082a9 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -179,12 +179,6 @@ impl QueryableBuffer { let persist_jobs = { let mut buffer = self.buffer.write(); - buffer.buffer_ops( - &write.ops, - &self.last_cache_provider, - &self.distinct_cache_provider, - ); - let mut persisting_chunks = vec![]; let catalog = Arc::clone(&buffer.catalog); for (database_id, table_map) in buffer.db_to_table.iter_mut() { @@ -227,6 +221,12 @@ impl QueryableBuffer { } } + buffer.buffer_ops( + &write.ops, + &self.last_cache_provider, + &self.distinct_cache_provider, + ); + persisting_chunks };