Skip to content

Commit

Permalink
fix: reproducer for the empty snapshot file issue
Browse files Browse the repository at this point in the history
  • Loading branch information
praveen-influx committed Jan 14, 2025
1 parent 30fcc2b commit bdc879e
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 4 deletions.
13 changes: 13 additions & 0 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,19 @@ impl Catalog {
pub fn inner(&self) -> &RwLock<InnerCatalog> {
&self.inner
}

pub fn table_id(&self, db_id: &DbId, name: Arc<str>) -> Option<TableId> {
let inner = self.inner.read();
inner.databases.get(db_id).and_then(|db| {
db.tables().find_map(|table_defn| {
if table_defn.table_name == name {
Some(table_defn.table_id)
} else {
None
}
})
})
}
}

#[serde_with::serde_as]
Expand Down
5 changes: 5 additions & 0 deletions influxdb3_wal/src/snapshot_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ impl SnapshotTracker {

fn should_run_snapshot(&mut self, force_snapshot: bool) -> bool {
// wal buffer can be empty but wal periods shouldn't be
// this assumption doesn't hold anymore, we might have just added a single
// no-op wal and it's wal period that means when force snapshot is set
// wal periods are never empty but the queryable buffer may not hold
// data that will get evicted to be snapshottedif the snapshots are happening
// very close to each other.
if self.wal_periods.is_empty() {
if force_snapshot {
info!("cannot force a snapshot when wal periods are empty");
Expand Down
130 changes: 126 additions & 4 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ impl WriteBufferImpl {
first_snapshot.next_file_id.set_next_id();
}

debug!(?persisted_snapshots, ">>> all loaded snapshots");
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
Arc::clone(&time_provider),
persisted_snapshots,
Expand Down Expand Up @@ -881,7 +882,9 @@ mod tests {
};
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use pretty_assertions::assert_eq;

#[test]
fn parse_lp_into_buffer() {
Expand Down Expand Up @@ -2644,7 +2647,11 @@ mod tests {

#[test_log::test(tokio::test)]
async fn test_check_mem_and_force_snapshot() {
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
// let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let tmp_dir = test_helpers::tmp_dir().unwrap();
debug!(?tmp_dir, ">>> using tmp dir for test");
let obj_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap());
let (write_buffer, _, _) = setup(
Time::from_timestamp_nanos(0),
Arc::clone(&obj_store),
Expand Down Expand Up @@ -2688,17 +2695,132 @@ mod tests {
let total_buffer_size_bytes_before = write_buffer.buffer.get_total_size_bytes();
debug!(?total_buffer_size_bytes_before, ">>> total buffer size");

check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 100).await;
debug!(">>> 1st snapshot..");
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await;

// check memory has gone down after forcing first snapshot
let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes();
debug!(?total_buffer_size_bytes_after, ">>> total buffer size");
assert!(total_buffer_size_bytes_before > total_buffer_size_bytes_after);
let from = object_store::path::Path::from("test_host/snapshots/");
let file_paths = load_files_from_obj_store(&obj_store, &from).await;
debug!(?file_paths, ">>> obj store snapshots");
for file_path in file_paths {
let bytes = obj_store
.get(&file_path)
.await
.unwrap()
.bytes()
.await
.unwrap();
let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap();
assert!(persisted_snapshot.min_time != i64::MAX);
assert!(persisted_snapshot.max_time != i64::MIN);
}

// no other writes so nothing can be snapshotted, so mem should stay same
let total_buffer_size_bytes_before = total_buffer_size_bytes_after;
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 100).await;
debug!(">>> 2nd snapshot..");
// This is technically the same issue that has been observed when running under
// high memory pressure, a snapshot file with no real content inside. The sample
// file looks like below,
//
// PersistedSnapshot{
// writer_id: "test_host",
// next_file_id: ParquetFileId(1),
// next_db_id: DbId(1),
// next_table_id: TableId(1),
// next_column_id: ColumnId(4),
// snapshot_sequence_number: SnapshotSequenceNumber(2),
// wal_file_sequence_number: WalFileSequenceNumber(22),
// catalog_sequence_number: CatalogSequenceNumber(2),
// parquet_size_bytes: 0,
// row_count: 0,
// min_time: 9223372036854775807,
// max_time: -9223372036854775808,
// databases: SerdeVecMap({})
// }
//
// The min/max time comes from the snapshot chunks that have been evicted from
// the query buffer. But when there's nothing evicted then the min/max stays
// the same as what they were initialized to i64::MAX/i64::MIN respectively.
//
// When forcing the snapshot for 2nd time here, the snapshotting process
// kicks off again because force_snapshot is set although wal buffer is empty
// and the wal periods are empty (cleared in 1st snapshot). In real run, the
// _theory_ is when under high mem pressure, with 1s flush interval and snapshot
// size 1, somehow the snapshotting process is queued to run immediately after
// the previous one which leads to this undesirable state.
//
// So, queryable buffer is empty (or doesn't hold data to filter out with new no-ops end
// time marker) when snapshot tracker's wal period is not empty
// this leads to writing a wal file and a snapshot file with empty rows, dbs etc.
// This however does not stop loading the data into memory as no empty parquet files are
// written out.
//
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await;
let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes();
// no other writes so nothing can be snapshotted, so mem should stay same
assert!(total_buffer_size_bytes_before == total_buffer_size_bytes_after);

drop(write_buffer);

// restart
debug!(">>> Restarting..");
let (_, _, _) = setup(
Time::from_timestamp_nanos(0),
Arc::clone(&obj_store),
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100_000,
flush_interval: Duration::from_millis(10),
snapshot_size: 10,
},
)
.await;

let from = object_store::path::Path::from("test_host/snapshots/");
let file_paths = load_files_from_obj_store(&obj_store, &from).await;
debug!(?file_paths, ">>> obj store snapshots");
for file_path in file_paths {
let bytes = obj_store
.get(&file_path)
.await
.unwrap()
.bytes()
.await
.unwrap();
let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap();
assert!(persisted_snapshot.min_time != i64::MAX);
assert!(persisted_snapshot.max_time != i64::MIN);
}
}

async fn load_files_from_obj_store(
object_store: &Arc<dyn ObjectStore>,
path: &Path,
) -> Vec<Path> {
let mut paths = Vec::new();
let mut offset: Option<Path> = None;
loop {
let mut listing = if let Some(offset) = offset {
object_store.list_with_offset(Some(path), &offset)
} else {
object_store.list(Some(path))
};
let path_count = paths.len();

while let Some(item) = listing.next().await {
paths.push(item.unwrap().location);
}

if path_count == paths.len() {
paths.sort();
break;
}

paths.sort();
offset = Some(paths.last().unwrap().clone())
}
paths
}
}

0 comments on commit bdc879e

Please sign in to comment.