From bdc879ea52622ead812c6b212e50b39ce276eee9 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Tue, 14 Jan 2025 19:41:59 +0000 Subject: [PATCH] fix: reproducer for the empty snapshot file issue --- influxdb3_catalog/src/catalog.rs | 13 +++ influxdb3_wal/src/snapshot_tracker.rs | 5 + influxdb3_write/src/write_buffer/mod.rs | 130 +++++++++++++++++++++++- 3 files changed, 144 insertions(+), 4 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 570baf182ed..9a62897b090 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -356,6 +356,19 @@ impl Catalog { pub fn inner(&self) -> &RwLock { &self.inner } + + pub fn table_id(&self, db_id: &DbId, name: Arc) -> Option { + let inner = self.inner.read(); + inner.databases.get(db_id).and_then(|db| { + db.tables().find_map(|table_defn| { + if table_defn.table_name == name { + Some(table_defn.table_id) + } else { + None + } + }) + }) + } } #[serde_with::serde_as] diff --git a/influxdb3_wal/src/snapshot_tracker.rs b/influxdb3_wal/src/snapshot_tracker.rs index c350dfda158..22855d0ec2d 100644 --- a/influxdb3_wal/src/snapshot_tracker.rs +++ b/influxdb3_wal/src/snapshot_tracker.rs @@ -93,6 +93,11 @@ impl SnapshotTracker { fn should_run_snapshot(&mut self, force_snapshot: bool) -> bool { // wal buffer can be empty but wal periods shouldn't be + // this assumption doesn't hold anymore, we might have just added a single + // no-op wal and it's wal period that means when force snapshot is set + // wal periods are never empty but the queryable buffer may not hold + // data that will get evicted to be snapshottedif the snapshots are happening + // very close to each other. if self.wal_periods.is_empty() { if force_snapshot { info!("cannot force a snapshot when wal periods are empty"); diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 4cc522153e3..e47fa919d33 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -198,6 +198,7 @@ impl WriteBufferImpl { first_snapshot.next_file_id.set_next_id(); } + debug!(?persisted_snapshots, ">>> all loaded snapshots"); let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots( Arc::clone(&time_provider), persisted_snapshots, @@ -881,7 +882,9 @@ mod tests { }; use object_store::local::LocalFileSystem; use object_store::memory::InMemory; + use object_store::path::Path; use object_store::{ObjectStore, PutPayload}; + use pretty_assertions::assert_eq; #[test] fn parse_lp_into_buffer() { @@ -2644,7 +2647,11 @@ mod tests { #[test_log::test(tokio::test)] async fn test_check_mem_and_force_snapshot() { - let obj_store: Arc = Arc::new(InMemory::new()); + // let obj_store: Arc = Arc::new(InMemory::new()); + let tmp_dir = test_helpers::tmp_dir().unwrap(); + debug!(?tmp_dir, ">>> using tmp dir for test"); + let obj_store: Arc = + Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap()); let (write_buffer, _, _) = setup( Time::from_timestamp_nanos(0), Arc::clone(&obj_store), @@ -2688,17 +2695,132 @@ mod tests { let total_buffer_size_bytes_before = write_buffer.buffer.get_total_size_bytes(); debug!(?total_buffer_size_bytes_before, ">>> total buffer size"); - check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 100).await; + debug!(">>> 1st snapshot.."); + check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await; // check memory has gone down after forcing first snapshot let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes(); debug!(?total_buffer_size_bytes_after, ">>> total buffer size"); assert!(total_buffer_size_bytes_before > total_buffer_size_bytes_after); + let from = object_store::path::Path::from("test_host/snapshots/"); + let file_paths = load_files_from_obj_store(&obj_store, &from).await; + debug!(?file_paths, ">>> obj store snapshots"); + for file_path in file_paths { + let bytes = obj_store + .get(&file_path) + .await + .unwrap() + .bytes() + .await + .unwrap(); + let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap(); + assert!(persisted_snapshot.min_time != i64::MAX); + assert!(persisted_snapshot.max_time != i64::MIN); + } - // no other writes so nothing can be snapshotted, so mem should stay same let total_buffer_size_bytes_before = total_buffer_size_bytes_after; - check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 100).await; + debug!(">>> 2nd snapshot.."); + // This is technically the same issue that has been observed when running under + // high memory pressure, a snapshot file with no real content inside. The sample + // file looks like below, + // + // PersistedSnapshot{ + // writer_id: "test_host", + // next_file_id: ParquetFileId(1), + // next_db_id: DbId(1), + // next_table_id: TableId(1), + // next_column_id: ColumnId(4), + // snapshot_sequence_number: SnapshotSequenceNumber(2), + // wal_file_sequence_number: WalFileSequenceNumber(22), + // catalog_sequence_number: CatalogSequenceNumber(2), + // parquet_size_bytes: 0, + // row_count: 0, + // min_time: 9223372036854775807, + // max_time: -9223372036854775808, + // databases: SerdeVecMap({}) + // } + // + // The min/max time comes from the snapshot chunks that have been evicted from + // the query buffer. But when there's nothing evicted then the min/max stays + // the same as what they were initialized to i64::MAX/i64::MIN respectively. + // + // When forcing the snapshot for 2nd time here, the snapshotting process + // kicks off again because force_snapshot is set although wal buffer is empty + // and the wal periods are empty (cleared in 1st snapshot). In real run, the + // _theory_ is when under high mem pressure, with 1s flush interval and snapshot + // size 1, somehow the snapshotting process is queued to run immediately after + // the previous one which leads to this undesirable state. + // + // So, queryable buffer is empty (or doesn't hold data to filter out with new no-ops end + // time marker) when snapshot tracker's wal period is not empty + // this leads to writing a wal file and a snapshot file with empty rows, dbs etc. + // This however does not stop loading the data into memory as no empty parquet files are + // written out. + // + check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await; let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes(); + // no other writes so nothing can be snapshotted, so mem should stay same assert!(total_buffer_size_bytes_before == total_buffer_size_bytes_after); + + drop(write_buffer); + + // restart + debug!(">>> Restarting.."); + let (_, _, _) = setup( + Time::from_timestamp_nanos(0), + Arc::clone(&obj_store), + WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100_000, + flush_interval: Duration::from_millis(10), + snapshot_size: 10, + }, + ) + .await; + + let from = object_store::path::Path::from("test_host/snapshots/"); + let file_paths = load_files_from_obj_store(&obj_store, &from).await; + debug!(?file_paths, ">>> obj store snapshots"); + for file_path in file_paths { + let bytes = obj_store + .get(&file_path) + .await + .unwrap() + .bytes() + .await + .unwrap(); + let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap(); + assert!(persisted_snapshot.min_time != i64::MAX); + assert!(persisted_snapshot.max_time != i64::MIN); + } + } + + async fn load_files_from_obj_store( + object_store: &Arc, + path: &Path, + ) -> Vec { + let mut paths = Vec::new(); + let mut offset: Option = None; + loop { + let mut listing = if let Some(offset) = offset { + object_store.list_with_offset(Some(path), &offset) + } else { + object_store.list(Some(path)) + }; + let path_count = paths.len(); + + while let Some(item) = listing.next().await { + paths.push(item.unwrap().location); + } + + if path_count == paths.len() { + paths.sort(); + break; + } + + paths.sort(); + offset = Some(paths.last().unwrap().clone()) + } + paths } }