Skip to content

Commit

Permalink
fix: avoid creating empty (0 dbs) snapshot file
Browse files Browse the repository at this point in the history
  • Loading branch information
praveen-influx committed Jan 15, 2025
1 parent bdc879e commit b93cb5d
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 76 deletions.
15 changes: 5 additions & 10 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,17 +357,12 @@ impl Catalog {
&self.inner
}

pub fn table_id(&self, db_id: &DbId, name: Arc<str>) -> Option<TableId> {
pub fn table_id(&self, db_id: &DbId, table_name: Arc<str>) -> Option<TableId> {
let inner = self.inner.read();
inner.databases.get(db_id).and_then(|db| {
db.tables().find_map(|table_defn| {
if table_defn.table_name == name {
Some(table_defn.table_id)
} else {
None
}
})
})
inner
.databases
.get(db_id)
.and_then(|db| db.table_name_to_id(table_name))
}
}

Expand Down
8 changes: 2 additions & 6 deletions influxdb3_wal/src/snapshot_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,8 @@ impl SnapshotTracker {
}

fn should_run_snapshot(&mut self, force_snapshot: bool) -> bool {
// wal buffer can be empty but wal periods shouldn't be
// this assumption doesn't hold anymore, we might have just added a single
// no-op wal and it's wal period that means when force snapshot is set
// wal periods are never empty but the queryable buffer may not hold
// data that will get evicted to be snapshottedif the snapshots are happening
// very close to each other.
// When force_snapshot is set the wal_periods won't be empty, as call site always adds a
// no-op when wal buffer is empty and adds the wal period
if self.wal_periods.is_empty() {
if force_snapshot {
info!("cannot force a snapshot when wal periods are empty");
Expand Down
75 changes: 37 additions & 38 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ impl WriteBufferImpl {
first_snapshot.next_file_id.set_next_id();
}

debug!(?persisted_snapshots, ">>> all loaded snapshots");
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
Arc::clone(&time_provider),
persisted_snapshots,
Expand Down Expand Up @@ -2647,9 +2646,11 @@ mod tests {

#[test_log::test(tokio::test)]
async fn test_check_mem_and_force_snapshot() {
// let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let tmp_dir = test_helpers::tmp_dir().unwrap();
debug!(?tmp_dir, ">>> using tmp dir for test");
debug!(
?tmp_dir,
">>> using tmp dir for test_check_mem_and_force_snapshot"
);
let obj_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap());
let (write_buffer, _, _) = setup(
Expand Down Expand Up @@ -2702,28 +2703,10 @@ mod tests {
let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes();
debug!(?total_buffer_size_bytes_after, ">>> total buffer size");
assert!(total_buffer_size_bytes_before > total_buffer_size_bytes_after);
let from = object_store::path::Path::from("test_host/snapshots/");
let file_paths = load_files_from_obj_store(&obj_store, &from).await;
debug!(?file_paths, ">>> obj store snapshots");
for file_path in file_paths {
let bytes = obj_store
.get(&file_path)
.await
.unwrap()
.bytes()
.await
.unwrap();
let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap();
assert!(persisted_snapshot.min_time != i64::MAX);
assert!(persisted_snapshot.max_time != i64::MIN);
}
assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await;

let total_buffer_size_bytes_before = total_buffer_size_bytes_after;
debug!(">>> 2nd snapshot..");
// This is technically the same issue that has been observed when running under
// high memory pressure, a snapshot file with no real content inside. The sample
// file looks like below,
//
// PersistedSnapshot{
// writer_id: "test_host",
// next_file_id: ParquetFileId(1),
Expand All @@ -2739,35 +2722,45 @@ mod tests {
// max_time: -9223372036854775808,
// databases: SerdeVecMap({})
// }
// This snapshot file was observed when running under high memory pressure.
//
// The min/max time comes from the snapshot chunks that have been evicted from
// the query buffer. But when there's nothing evicted then the min/max stays
// the same as what they were initialized to i64::MAX/i64::MIN respectively.
//
// When forcing the snapshot for 2nd time here, the snapshotting process
// kicks off again because force_snapshot is set although wal buffer is empty
// and the wal periods are empty (cleared in 1st snapshot). In real run, the
// _theory_ is when under high mem pressure, with 1s flush interval and snapshot
// size 1, somehow the snapshotting process is queued to run immediately after
// the previous one which leads to this undesirable state.
//
// So, queryable buffer is empty (or doesn't hold data to filter out with new no-ops end
// time marker) when snapshot tracker's wal period is not empty
// this leads to writing a wal file and a snapshot file with empty rows, dbs etc.
// This however does not stop loading the data into memory as no empty parquet files are
// written out.
//
// This however does not stop loading the data into memory as no empty
// parquet files are written out. But this test recreates that issue and checks
// object store directly to make sure inconsistent snapshot file isn't written
// out in the first place
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await;
let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes();
// no other writes so nothing can be snapshotted, so mem should stay same
assert!(total_buffer_size_bytes_before == total_buffer_size_bytes_after);

drop(write_buffer);
assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await;

// restart
debug!(">>> Restarting..");
let (write_buffer_after_restart, _, _) = setup(
Time::from_timestamp_nanos(300),
Arc::clone(&obj_store),
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100_000,
flush_interval: Duration::from_millis(10),
snapshot_size: 10,
},
)
.await;

assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await;
drop(write_buffer_after_restart);

// restart
debug!(">>> Restarting again..");
let (_, _, _) = setup(
Time::from_timestamp_nanos(0),
Time::from_timestamp_nanos(400),
Arc::clone(&obj_store),
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
Expand All @@ -2777,9 +2770,12 @@ mod tests {
},
)
.await;
assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await;
}

let from = object_store::path::Path::from("test_host/snapshots/");
let file_paths = load_files_from_obj_store(&obj_store, &from).await;
async fn assert_dbs_not_empty_in_snapshot_file(obj_store: &Arc<dyn ObjectStore>, host: &str) {
let from = Path::from(format!("{host}/snapshots/"));
let file_paths = load_files_from_obj_store(obj_store, &from).await;
debug!(?file_paths, ">>> obj store snapshots");
for file_path in file_paths {
let bytes = obj_store
Expand All @@ -2790,6 +2786,9 @@ mod tests {
.await
.unwrap();
let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap();
// dbs not empty
assert!(!persisted_snapshot.databases.is_empty());
// min and max times aren't defaults
assert!(persisted_snapshot.min_time != i64::MAX);
assert!(persisted_snapshot.max_time != i64::MIN);
}
Expand Down
82 changes: 60 additions & 22 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ impl QueryableBuffer {
.expect("table exists");
let snapshot_chunks =
table_buffer.snapshot(table_def, snapshot_details.end_time_marker);
for chunk in &snapshot_chunks {
debug!(?chunk.chunk_time, num_rows_in_chunk = ?chunk.record_batch.num_rows(), ">>> removing chunk with records");
}

for chunk in snapshot_chunks {
let table_name =
Expand Down Expand Up @@ -291,6 +288,7 @@ impl QueryableBuffer {
catalog.sequence_number(),
);
let mut cache_notifiers = vec![];
let persist_jobs_empty = persist_jobs.is_empty();
for persist_job in persist_jobs {
let path = persist_job.path.to_string();
let database_id = persist_job.database_id;
Expand Down Expand Up @@ -338,19 +336,54 @@ impl QueryableBuffer {
)
}

// persist the snapshot file
loop {
match persister.persist_snapshot(&persisted_snapshot).await {
Ok(_) => {
let persisted_snapshot = Some(persisted_snapshot.clone());
notify_snapshot_tx
.send(persisted_snapshot)
.expect("persisted snapshot notify tx should not be closed");
break;
}
Err(e) => {
error!(%e, "Error persisting snapshot, sleeping and retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
// persist the snapshot file - only if persist jobs are present
// if persist_jobs is empty, then parquet file wouldn't have been
// written out, so it's desirable to not write empty snapshot file.
//
// How can persist jobs be empty even though snapshot is triggered?
//
// When force snapshot is set, wal_periods (tracked by
// snapshot_tracker) will never be empty as a no-op is added. This
// means even though there is a wal period the query buffer might
// still be empty. The reason is, when snapshots are happening very
// close to each other (when force snapshot is set), they could get
// queued to run immediately one after the other as illustrated in
// example series of flushes and force snapshots below,
//
// 1 (only wal flush) // triggered by flush interval 1s
// 2 (snapshot) // triggered by flush interval 1s
// 3 (force_snapshot) // triggered by mem check interval 10s
// 4 (force_snapshot) // triggered by mem check interval 10s
//
// Although the flush interval an mem check intervals aren't same
// there's a good chance under high memory pressure there will be
// a lot of overlapping.
//
// In this setup - after 2 (snapshot), we emptied wal buffer and as
// soon as snapshot is done, 3 will try to run the snapshot but wal
// buffer can be empty at this point, which means it adds a no-op.
// no-op has the current time which will be used as the
// end_time_marker. That would evict everything from query buffer, so
// when 5 forces another snapshot there's no data in the query
// buffer though it has a wal_period. Under normal (i.e without
// force_snapshot) snapshot runs snapshot_tracker will check
// wal_periods are empty so it won't trigger a snapshot in the first
// place.
if !persist_jobs_empty {
loop {
debug!(?persisted_snapshot, ">>> TRYING TO PERSIST SNAPSHOT");
match persister.persist_snapshot(&persisted_snapshot).await {
Ok(_) => {
let persisted_snapshot = Some(persisted_snapshot.clone());
notify_snapshot_tx
.send(persisted_snapshot)
.expect("persisted snapshot notify tx should not be closed");
break;
}
Err(e) => {
error!(%e, "Error persisting snapshot, sleeping and retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
Expand All @@ -363,14 +396,19 @@ impl QueryableBuffer {
for notifier in cache_notifiers.into_iter().flatten() {
let _ = notifier.await;
}
let mut buffer = buffer.write();
for (_, table_map) in buffer.db_to_table.iter_mut() {
for (_, table_buffer) in table_map.iter_mut() {
table_buffer.clear_snapshots();

// same reason as explained above, if persist jobs are empty, no snapshotting
// has happened so no need to clear the snapshots
if !persist_jobs_empty {
let mut buffer = buffer.write();
for (_, table_map) in buffer.db_to_table.iter_mut() {
for (_, table_buffer) in table_map.iter_mut() {
table_buffer.clear_snapshots();
}
}
}

persisted_files.add_persisted_snapshot_files(persisted_snapshot);
persisted_files.add_persisted_snapshot_files(persisted_snapshot);
}
});

let _ = sender.send(snapshot_details);
Expand Down

0 comments on commit b93cb5d

Please sign in to comment.