diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index b303543087a..abfdc8f7979 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -208,6 +208,17 @@ pub struct Config { )] pub wal_max_write_buffer_size: usize, + /// Number of snapshotted wal files to retain in object store, wal flush does not clear + /// the wal files immediately instead they are only deleted when snapshotted and num wal files + /// count exceeds this size + #[clap( + long = "snapshotted-wal-files-to-keep", + env = "INFLUXDB3_NUM_WAL_FILES_TO_KEEP", + default_value = "300", + action + )] + pub snapshotted_wal_files_to_keep: u64, + // TODO - tune this default: /// The size of the query log. Up to this many queries will remain in the log before /// old queries are evicted to make room for new ones. @@ -502,6 +513,7 @@ pub async fn command(config: Config) -> Result<()> { wal_config, parquet_cache, metric_registry: Arc::clone(&metrics), + snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep, }) .await .map_err(|e| Error::WriteBufferInit(e.into()))?; diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 02cbf558bed..6ba048a2779 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -865,6 +865,7 @@ mod tests { wal_config, parquet_cache: None, metric_registry: Arc::clone(&metric_registry), + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index ef4c6164a0c..2727bd2c531 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -778,6 +778,7 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(parquet_cache), metric_registry: Arc::clone(&metrics), + snapshotted_wal_files_to_keep: 100, }, ) .await diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 3e92b3beade..18613e9189c 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -713,6 +713,7 @@ mod tests { }, parquet_cache: Some(parquet_cache), metric_registry: Default::default(), + snapshotted_wal_files_to_keep: 1, }) .await .unwrap(); diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 0d9947e4f39..c15ec78f6ed 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -12,8 +12,8 @@ use iox_time::TimeProvider; use object_store::path::{Path, PathPart}; use object_store::{ObjectStore, PutPayload}; use observability_deps::tracing::{debug, error, info}; -use std::sync::Arc; use std::time::Duration; +use std::{str::FromStr, sync::Arc}; use tokio::sync::Mutex; use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore}; @@ -25,11 +25,15 @@ pub struct WalObjectStore { added_file_notifiers: parking_lot::Mutex>>, /// Buffered wal ops go in here along with the state to track when to snapshot flush_buffer: Mutex, + /// number of snapshotted wal files to retain in object store + snapshotted_wal_files_to_keep: u64, + wal_remover: parking_lot::Mutex, } impl WalObjectStore { /// Creates a new WAL. This will replay files into the notifier and trigger any snapshots that /// exist in the WAL files that haven't been cleaned up yet. + #[allow(clippy::too_many_arguments)] pub async fn new( time_provider: Arc, object_store: Arc, @@ -38,25 +42,33 @@ impl WalObjectStore { config: WalConfig, last_wal_sequence_number: Option, last_snapshot_sequence_number: Option, + snapshotted_wal_files_to_keep: u64, ) -> Result, crate::Error> { + let host_identifier = host_identifier_prefix.into(); + let all_wal_file_paths = + load_all_wal_file_paths(Arc::clone(&object_store), host_identifier.clone()).await?; let flush_interval = config.flush_interval; let wal = Self::new_without_replay( time_provider, object_store, - host_identifier_prefix, + host_identifier, file_notifier, config, last_wal_sequence_number, last_snapshot_sequence_number, + &all_wal_file_paths, + snapshotted_wal_files_to_keep, ); - wal.replay(last_wal_sequence_number).await?; + wal.replay(last_wal_sequence_number, &all_wal_file_paths) + .await?; let wal = Arc::new(wal); background_wal_flush(Arc::clone(&wal), flush_interval); Ok(wal) } + #[allow(clippy::too_many_arguments)] fn new_without_replay( time_provider: Arc, object_store: Arc, @@ -65,8 +77,12 @@ impl WalObjectStore { config: WalConfig, last_wal_sequence_number: Option, last_snapshot_sequence_number: Option, + all_wal_file_paths: &[Path], + num_wal_files_to_keep: u64, ) -> Self { let wal_file_sequence_number = last_wal_sequence_number.unwrap_or_default().next(); + let oldest_wal_file_num = oldest_wal_file_num(all_wal_file_paths); + Self { object_store, host_identifier_prefix: host_identifier_prefix.into(), @@ -91,6 +107,11 @@ impl WalObjectStore { last_snapshot_sequence_number, ), )), + snapshotted_wal_files_to_keep: num_wal_files_to_keep, + wal_remover: parking_lot::Mutex::new(WalFileRemover { + oldest_wal_file: oldest_wal_file_num, + last_snapshotted_wal_sequence_number: last_wal_sequence_number, + }), } } @@ -99,11 +120,10 @@ impl WalObjectStore { pub async fn replay( &self, last_wal_sequence_number: Option, + all_wal_file_paths: &[Path], ) -> crate::Result<()> { debug!(">>> replaying"); - let paths = self - .load_existing_wal_file_paths(last_wal_sequence_number) - .await?; + let paths = self.load_existing_wal_file_paths(last_wal_sequence_number, all_wal_file_paths); let last_snapshot_sequence_number = { self.flush_buffer @@ -191,7 +211,7 @@ impl WalObjectStore { } /// Stop accepting write operations, flush of buffered writes to a WAL file and return when done. - pub async fn shutdown(&self) { + pub async fn shutdown(&mut self) { // stop accepting writes self.flush_buffer.lock().await.wal_buffer.is_shutdown = true; @@ -340,43 +360,34 @@ impl WalObjectStore { snapshot_response } - async fn load_existing_wal_file_paths( + fn load_existing_wal_file_paths( &self, last_wal_sequence_number: Option, - ) -> crate::Result> { - let mut paths = Vec::new(); - let mut offset: Option = None; - let path = Path::from(format!("{host}/wal", host = self.host_identifier_prefix)); - loop { - let mut listing = if let Some(offset) = offset { - self.object_store.list_with_offset(Some(&path), &offset) - } else { - self.object_store.list(Some(&path)) - }; - let path_count = paths.len(); - - while let Some(item) = listing.next().await { - paths.push(item?.location); - } - - if path_count == paths.len() { - break; - } - - paths.sort(); - offset = Some(paths.last().unwrap().clone()) - } - - // if we have a last wal path from persisted snapshots, we don't need to load the wal files - // that came before it: - if let Some(last_wal_sequence_number) = last_wal_sequence_number { - let last_wal_path = wal_path(&self.host_identifier_prefix, last_wal_sequence_number); - paths.retain(|path| path >= &last_wal_path); - } - - paths.sort(); - - Ok(paths) + all_wal_file_paths: &[Path], + ) -> Vec { + all_wal_file_paths + .iter() + .filter(|path| { + // if we have a last wal path from persisted snapshots, we don't need to load the wal files + // that came before it: + if let Some(last_wal_sequence_number) = last_wal_sequence_number { + let last_wal_path = + wal_path(&self.host_identifier_prefix, last_wal_sequence_number); + debug!( + ?path, + ?last_wal_path, + ">>> path and last_wal_path check when replaying" + ); + // last_wal_sequence_number that comes from persisted snapshot is + // holds the last wal number (inclusive) that has been snapshotted + // so, anything greater than that needs to be loaded + *path > &last_wal_path + } else { + true + } + }) + .cloned() + .collect() } async fn remove_snapshot_wal_files( @@ -384,31 +395,67 @@ impl WalObjectStore { snapshot_details: SnapshotDetails, snapshot_permit: OwnedSemaphorePermit, ) { - let start = snapshot_details.first_wal_sequence_number.as_u64(); - let end = snapshot_details.last_wal_sequence_number.as_u64(); - for period in start..=end { - let path = wal_path( - &self.host_identifier_prefix, - WalFileSequenceNumber::new(period), + let (oldest, last, curr_num_files) = { + let (oldest, last) = self.wal_remover.lock().get_current_state(); + let curr_num_files = last - oldest; + (oldest, last, curr_num_files) + }; + debug!( + ?oldest, + ?last, + ?curr_num_files, + ">>> checking num wal files to delete" + ); + + if curr_num_files > self.snapshotted_wal_files_to_keep { + let num_files_to_delete = curr_num_files - self.snapshotted_wal_files_to_keep; + let last_to_delete = oldest + num_files_to_delete; + + debug!( + num_files_to_keep = ?self.snapshotted_wal_files_to_keep, + ?curr_num_files, + ?num_files_to_delete, + ?last_to_delete, + ">>> more wal files than num files to keep around" ); - debug!(?path, ">>> deleting wal file"); - - loop { - match self.object_store.delete(&path).await { - Ok(_) => break, - Err(object_store::Error::Generic { store, source }) => { - error!(%store, %source, "error deleting wal file"); - // hopefully just a temporary error, keep trying until we succeed - tokio::time::sleep(Duration::from_secs(1)).await; - } - Err(e) => { - // this must be configuration or file not there error or something else, - // log it and move on - error!(%e, "error deleting wal file"); - break; + + for idx in oldest..last_to_delete { + let path = wal_path( + &self.host_identifier_prefix, + WalFileSequenceNumber::new(idx), + ); + debug!(?path, ">>> deleting wal file"); + + loop { + match self.object_store.delete(&path).await { + Ok(_) => break, + Err(object_store::Error::Generic { store, source }) => { + error!(%store, %source, "error deleting wal file"); + // hopefully just a temporary error, keep trying until we succeed + tokio::time::sleep(Duration::from_secs(1)).await; + } + Err(e) => { + // this must be configuration or file not there error or something else, + // log it and move on + error!(%e, "error deleting wal file"); + break; + } } } } + + { + self.wal_remover.lock().update_state( + last_to_delete, + snapshot_details.last_wal_sequence_number.as_u64(), + ); + } + } else { + { + self.wal_remover + .lock() + .update_last_wal_num(snapshot_details.last_wal_sequence_number); + } } // release the permit so the next snapshot can be run when the time comes @@ -416,6 +463,41 @@ impl WalObjectStore { } } +fn oldest_wal_file_num(all_wal_file_paths: &[Path]) -> Option { + let file_name = all_wal_file_paths.first()?.filename()?; + WalFileSequenceNumber::from_str(file_name).ok() +} + +async fn load_all_wal_file_paths( + object_store: Arc, + host_identifier_prefix: String, +) -> Result, crate::Error> { + let mut paths = Vec::new(); + let mut offset: Option = None; + let path = Path::from(format!("{host}/wal", host = host_identifier_prefix)); + loop { + let mut listing = if let Some(offset) = offset { + object_store.list_with_offset(Some(&path), &offset) + } else { + object_store.list(Some(&path)) + }; + let path_count = paths.len(); + + while let Some(item) = listing.next().await { + paths.push(item?.location); + } + + if path_count == paths.len() { + paths.sort(); + break; + } + + paths.sort(); + offset = Some(paths.last().unwrap().clone()) + } + Ok(paths) +} + #[async_trait::async_trait] impl Wal for WalObjectStore { async fn buffer_op_unconfirmed(&self, op: WalOp) -> crate::Result<(), crate::Error> { @@ -704,6 +786,10 @@ impl WalBuffer { if ops.is_empty() && forced_snapshot && self.no_op.is_some() { let time = self.no_op.unwrap(); ops.push(WalOp::Noop(NoopDetails { timestamp_ns: time })); + // when we leave behind wals, these noops need min/max time as snapshot tracker falls + // over without them when adding wal period by default + min_timestamp_ns = time; + max_timestamp_ns = time + 1; } ( @@ -743,6 +829,32 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber { } } +#[derive(Debug)] +struct WalFileRemover { + oldest_wal_file: Option, + last_snapshotted_wal_sequence_number: Option, +} + +impl WalFileRemover { + fn get_current_state(&self) -> (u64, u64) { + ( + self.oldest_wal_file.map(|num| num.as_u64()).unwrap_or(0), + self.last_snapshotted_wal_sequence_number + .map(|num| num.as_u64()) + .unwrap_or(0), + ) + } + + fn update_last_wal_num(&mut self, last_wal: WalFileSequenceNumber) { + self.last_snapshotted_wal_sequence_number.replace(last_wal); + } + + fn update_state(&mut self, oldest: u64, last: u64) { + self.oldest_wal_file = Some(WalFileSequenceNumber::new(oldest)); + self.last_snapshotted_wal_sequence_number = Some(WalFileSequenceNumber::new(last)); + } +} + #[cfg(test)] mod tests { use super::*; @@ -770,6 +882,7 @@ mod tests { snapshot_size: 2, gen1_duration: Gen1Duration::new_1m(), }; + let paths = vec![]; let wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), Arc::clone(&object_store), @@ -778,6 +891,8 @@ mod tests { wal_config, None, None, + &paths, + 1, ); let db_name: Arc = "db1".into(); @@ -974,6 +1089,7 @@ mod tests { // before we trigger a snapshot, test replay with a new wal and notifier let replay_notifier: Arc = Arc::new(TestNotifier::default()); + let paths = vec![]; let replay_wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), Arc::clone(&object_store), @@ -982,15 +1098,32 @@ mod tests { wal_config, None, None, + &paths, + 1, ); assert_eq!( - replay_wal.load_existing_wal_file_paths(None).await.unwrap(), + replay_wal.load_existing_wal_file_paths( + None, + &[ + Path::from("my_host/wal/00000000001.wal"), + Path::from("my_host/wal/00000000002.wal") + ] + ), vec![ Path::from("my_host/wal/00000000001.wal"), Path::from("my_host/wal/00000000002.wal") ] ); - replay_wal.replay(None).await.unwrap(); + replay_wal + .replay( + None, + &[ + Path::from("my_host/wal/00000000001.wal"), + Path::from("my_host/wal/00000000002.wal"), + ], + ) + .await + .unwrap(); let replay_notifier = replay_notifier .as_any() .downcast_ref::() @@ -1115,6 +1248,7 @@ mod tests { // test that replay now only has file 3 let replay_notifier: Arc = Arc::new(TestNotifier::default()); + let paths = vec![]; let replay_wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), object_store, @@ -1123,12 +1257,18 @@ mod tests { wal_config, None, None, + &paths, + 1, ); assert_eq!( - replay_wal.load_existing_wal_file_paths(None).await.unwrap(), + replay_wal + .load_existing_wal_file_paths(None, &[Path::from("my_host/wal/00000000003.wal")]), vec![Path::from("my_host/wal/00000000003.wal")] ); - replay_wal.replay(None).await.unwrap(); + replay_wal + .replay(None, &[Path::from("my_host/wal/00000000003.wal")]) + .await + .unwrap(); let replay_notifier = replay_notifier .as_any() .downcast_ref::() @@ -1155,6 +1295,7 @@ mod tests { snapshot_size: 2, gen1_duration: Gen1Duration::new_1m(), }; + let paths = vec![]; let wal = WalObjectStore::new_without_replay( time_provider, Arc::clone(&object_store), @@ -1163,6 +1304,8 @@ mod tests { wal_config, None, None, + &paths, + 10, ); assert!(wal.flush_buffer(false).await.is_none()); diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 2b7ee128b18..7e7cb007392 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -162,6 +162,7 @@ pub struct WriteBufferImplArgs { pub wal_config: WalConfig, pub parquet_cache: Option>, pub metric_registry: Arc, + pub snapshotted_wal_files_to_keep: u64, } impl WriteBufferImpl { @@ -176,6 +177,7 @@ impl WriteBufferImpl { wal_config, parquet_cache, metric_registry, + snapshotted_wal_files_to_keep, }: WriteBufferImplArgs, ) -> Result> { // load snapshots and replay the wal into the in memory buffer @@ -219,6 +221,7 @@ impl WriteBufferImpl { wal_config, last_wal_sequence_number, last_snapshot_sequence_number, + snapshotted_wal_files_to_keep, ) .await?; @@ -933,6 +936,7 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); @@ -1019,6 +1023,7 @@ mod tests { }, parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); @@ -1087,6 +1092,7 @@ mod tests { }, parquet_cache: wbuf.parquet_cache.clone(), metric_registry: Default::default(), + snapshotted_wal_files_to_keep: 10, }) .await .unwrap() @@ -1316,6 +1322,7 @@ mod tests { }, parquet_cache: write_buffer.parquet_cache.clone(), metric_registry: Default::default(), + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); @@ -2603,6 +2610,7 @@ mod tests { wal_config, parquet_cache, metric_registry: Arc::clone(&metric_registry), + snapshotted_wal_files_to_keep: 10, }) .await .unwrap(); diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 6901abeff7b..dd5dee333fa 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -179,6 +179,7 @@ impl QueryableBuffer { let persist_jobs = { let mut buffer = self.buffer.write(); + // need to buffer first before snapshotting buffer.buffer_ops( &write.ops, &self.last_cache_provider, @@ -214,7 +215,7 @@ impl QueryableBuffer { table_name.as_ref(), table_id.as_u32(), chunk.chunk_time, - write.wal_file_number, + snapshot_details.last_wal_sequence_number, ), batch: chunk.record_batch, schema: chunk.schema, @@ -278,7 +279,7 @@ impl QueryableBuffer { let mut persisted_snapshot = PersistedSnapshot::new( persister.host_identifier_prefix().to_string(), snapshot_details.snapshot_sequence_number, - wal_file_number, + snapshot_details.last_wal_sequence_number, catalog.sequence_number(), ); let mut cache_notifiers = vec![];