From b93cb5d348bba06721f7754d45758d018efba89a Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Wed, 15 Jan 2025 11:14:15 +0000 Subject: [PATCH] fix: avoid creating empty (0 dbs) snapshot file --- influxdb3_catalog/src/catalog.rs | 15 ++-- influxdb3_wal/src/snapshot_tracker.rs | 8 +- influxdb3_write/src/write_buffer/mod.rs | 75 +++++++++-------- .../src/write_buffer/queryable_buffer.rs | 82 ++++++++++++++----- 4 files changed, 104 insertions(+), 76 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 9a62897b090..09d434436aa 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -357,17 +357,12 @@ impl Catalog { &self.inner } - pub fn table_id(&self, db_id: &DbId, name: Arc) -> Option { + pub fn table_id(&self, db_id: &DbId, table_name: Arc) -> Option { let inner = self.inner.read(); - inner.databases.get(db_id).and_then(|db| { - db.tables().find_map(|table_defn| { - if table_defn.table_name == name { - Some(table_defn.table_id) - } else { - None - } - }) - }) + inner + .databases + .get(db_id) + .and_then(|db| db.table_name_to_id(table_name)) } } diff --git a/influxdb3_wal/src/snapshot_tracker.rs b/influxdb3_wal/src/snapshot_tracker.rs index 22855d0ec2d..9699e7732bf 100644 --- a/influxdb3_wal/src/snapshot_tracker.rs +++ b/influxdb3_wal/src/snapshot_tracker.rs @@ -92,12 +92,8 @@ impl SnapshotTracker { } fn should_run_snapshot(&mut self, force_snapshot: bool) -> bool { - // wal buffer can be empty but wal periods shouldn't be - // this assumption doesn't hold anymore, we might have just added a single - // no-op wal and it's wal period that means when force snapshot is set - // wal periods are never empty but the queryable buffer may not hold - // data that will get evicted to be snapshottedif the snapshots are happening - // very close to each other. + // When force_snapshot is set the wal_periods won't be empty, as call site always adds a + // no-op when wal buffer is empty and adds the wal period if self.wal_periods.is_empty() { if force_snapshot { info!("cannot force a snapshot when wal periods are empty"); diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index e47fa919d33..b4456d36a77 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -198,7 +198,6 @@ impl WriteBufferImpl { first_snapshot.next_file_id.set_next_id(); } - debug!(?persisted_snapshots, ">>> all loaded snapshots"); let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots( Arc::clone(&time_provider), persisted_snapshots, @@ -2647,9 +2646,11 @@ mod tests { #[test_log::test(tokio::test)] async fn test_check_mem_and_force_snapshot() { - // let obj_store: Arc = Arc::new(InMemory::new()); let tmp_dir = test_helpers::tmp_dir().unwrap(); - debug!(?tmp_dir, ">>> using tmp dir for test"); + debug!( + ?tmp_dir, + ">>> using tmp dir for test_check_mem_and_force_snapshot" + ); let obj_store: Arc = Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap()); let (write_buffer, _, _) = setup( @@ -2702,28 +2703,10 @@ mod tests { let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes(); debug!(?total_buffer_size_bytes_after, ">>> total buffer size"); assert!(total_buffer_size_bytes_before > total_buffer_size_bytes_after); - let from = object_store::path::Path::from("test_host/snapshots/"); - let file_paths = load_files_from_obj_store(&obj_store, &from).await; - debug!(?file_paths, ">>> obj store snapshots"); - for file_path in file_paths { - let bytes = obj_store - .get(&file_path) - .await - .unwrap() - .bytes() - .await - .unwrap(); - let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap(); - assert!(persisted_snapshot.min_time != i64::MAX); - assert!(persisted_snapshot.max_time != i64::MIN); - } + assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await; let total_buffer_size_bytes_before = total_buffer_size_bytes_after; debug!(">>> 2nd snapshot.."); - // This is technically the same issue that has been observed when running under - // high memory pressure, a snapshot file with no real content inside. The sample - // file looks like below, - // // PersistedSnapshot{ // writer_id: "test_host", // next_file_id: ParquetFileId(1), @@ -2739,35 +2722,45 @@ mod tests { // max_time: -9223372036854775808, // databases: SerdeVecMap({}) // } + // This snapshot file was observed when running under high memory pressure. // // The min/max time comes from the snapshot chunks that have been evicted from // the query buffer. But when there's nothing evicted then the min/max stays // the same as what they were initialized to i64::MAX/i64::MIN respectively. // - // When forcing the snapshot for 2nd time here, the snapshotting process - // kicks off again because force_snapshot is set although wal buffer is empty - // and the wal periods are empty (cleared in 1st snapshot). In real run, the - // _theory_ is when under high mem pressure, with 1s flush interval and snapshot - // size 1, somehow the snapshotting process is queued to run immediately after - // the previous one which leads to this undesirable state. - // - // So, queryable buffer is empty (or doesn't hold data to filter out with new no-ops end - // time marker) when snapshot tracker's wal period is not empty - // this leads to writing a wal file and a snapshot file with empty rows, dbs etc. - // This however does not stop loading the data into memory as no empty parquet files are - // written out. - // + // This however does not stop loading the data into memory as no empty + // parquet files are written out. But this test recreates that issue and checks + // object store directly to make sure inconsistent snapshot file isn't written + // out in the first place check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await; let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes(); // no other writes so nothing can be snapshotted, so mem should stay same assert!(total_buffer_size_bytes_before == total_buffer_size_bytes_after); drop(write_buffer); + assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await; // restart debug!(">>> Restarting.."); + let (write_buffer_after_restart, _, _) = setup( + Time::from_timestamp_nanos(300), + Arc::clone(&obj_store), + WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100_000, + flush_interval: Duration::from_millis(10), + snapshot_size: 10, + }, + ) + .await; + + assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await; + drop(write_buffer_after_restart); + + // restart + debug!(">>> Restarting again.."); let (_, _, _) = setup( - Time::from_timestamp_nanos(0), + Time::from_timestamp_nanos(400), Arc::clone(&obj_store), WalConfig { gen1_duration: Gen1Duration::new_1m(), @@ -2777,9 +2770,12 @@ mod tests { }, ) .await; + assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await; + } - let from = object_store::path::Path::from("test_host/snapshots/"); - let file_paths = load_files_from_obj_store(&obj_store, &from).await; + async fn assert_dbs_not_empty_in_snapshot_file(obj_store: &Arc, host: &str) { + let from = Path::from(format!("{host}/snapshots/")); + let file_paths = load_files_from_obj_store(obj_store, &from).await; debug!(?file_paths, ">>> obj store snapshots"); for file_path in file_paths { let bytes = obj_store @@ -2790,6 +2786,9 @@ mod tests { .await .unwrap(); let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap(); + // dbs not empty + assert!(!persisted_snapshot.databases.is_empty()); + // min and max times aren't defaults assert!(persisted_snapshot.min_time != i64::MAX); assert!(persisted_snapshot.max_time != i64::MIN); } diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 08ea2e5a9b1..f16259889f7 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -204,9 +204,6 @@ impl QueryableBuffer { .expect("table exists"); let snapshot_chunks = table_buffer.snapshot(table_def, snapshot_details.end_time_marker); - for chunk in &snapshot_chunks { - debug!(?chunk.chunk_time, num_rows_in_chunk = ?chunk.record_batch.num_rows(), ">>> removing chunk with records"); - } for chunk in snapshot_chunks { let table_name = @@ -291,6 +288,7 @@ impl QueryableBuffer { catalog.sequence_number(), ); let mut cache_notifiers = vec![]; + let persist_jobs_empty = persist_jobs.is_empty(); for persist_job in persist_jobs { let path = persist_job.path.to_string(); let database_id = persist_job.database_id; @@ -338,19 +336,54 @@ impl QueryableBuffer { ) } - // persist the snapshot file - loop { - match persister.persist_snapshot(&persisted_snapshot).await { - Ok(_) => { - let persisted_snapshot = Some(persisted_snapshot.clone()); - notify_snapshot_tx - .send(persisted_snapshot) - .expect("persisted snapshot notify tx should not be closed"); - break; - } - Err(e) => { - error!(%e, "Error persisting snapshot, sleeping and retrying..."); - tokio::time::sleep(Duration::from_secs(1)).await; + // persist the snapshot file - only if persist jobs are present + // if persist_jobs is empty, then parquet file wouldn't have been + // written out, so it's desirable to not write empty snapshot file. + // + // How can persist jobs be empty even though snapshot is triggered? + // + // When force snapshot is set, wal_periods (tracked by + // snapshot_tracker) will never be empty as a no-op is added. This + // means even though there is a wal period the query buffer might + // still be empty. The reason is, when snapshots are happening very + // close to each other (when force snapshot is set), they could get + // queued to run immediately one after the other as illustrated in + // example series of flushes and force snapshots below, + // + // 1 (only wal flush) // triggered by flush interval 1s + // 2 (snapshot) // triggered by flush interval 1s + // 3 (force_snapshot) // triggered by mem check interval 10s + // 4 (force_snapshot) // triggered by mem check interval 10s + // + // Although the flush interval an mem check intervals aren't same + // there's a good chance under high memory pressure there will be + // a lot of overlapping. + // + // In this setup - after 2 (snapshot), we emptied wal buffer and as + // soon as snapshot is done, 3 will try to run the snapshot but wal + // buffer can be empty at this point, which means it adds a no-op. + // no-op has the current time which will be used as the + // end_time_marker. That would evict everything from query buffer, so + // when 5 forces another snapshot there's no data in the query + // buffer though it has a wal_period. Under normal (i.e without + // force_snapshot) snapshot runs snapshot_tracker will check + // wal_periods are empty so it won't trigger a snapshot in the first + // place. + if !persist_jobs_empty { + loop { + debug!(?persisted_snapshot, ">>> TRYING TO PERSIST SNAPSHOT"); + match persister.persist_snapshot(&persisted_snapshot).await { + Ok(_) => { + let persisted_snapshot = Some(persisted_snapshot.clone()); + notify_snapshot_tx + .send(persisted_snapshot) + .expect("persisted snapshot notify tx should not be closed"); + break; + } + Err(e) => { + error!(%e, "Error persisting snapshot, sleeping and retrying..."); + tokio::time::sleep(Duration::from_secs(1)).await; + } } } } @@ -363,14 +396,19 @@ impl QueryableBuffer { for notifier in cache_notifiers.into_iter().flatten() { let _ = notifier.await; } - let mut buffer = buffer.write(); - for (_, table_map) in buffer.db_to_table.iter_mut() { - for (_, table_buffer) in table_map.iter_mut() { - table_buffer.clear_snapshots(); + + // same reason as explained above, if persist jobs are empty, no snapshotting + // has happened so no need to clear the snapshots + if !persist_jobs_empty { + let mut buffer = buffer.write(); + for (_, table_map) in buffer.db_to_table.iter_mut() { + for (_, table_buffer) in table_map.iter_mut() { + table_buffer.clear_snapshots(); + } } - } - persisted_files.add_persisted_snapshot_files(persisted_snapshot); + persisted_files.add_persisted_snapshot_files(persisted_snapshot); + } }); let _ = sender.send(snapshot_details);