From 2f6f29a954ec1cd9393068bc73db700d1b9a02d2 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Sun, 12 Jan 2025 00:11:54 +0000 Subject: [PATCH] refactor: address PR comment --- influxdb3_server/src/query_executor/mod.rs | 32 +++++++++---------- influxdb3_wal/src/object_store.rs | 9 ++++-- .../src/write_buffer/queryable_buffer.rs | 17 +++++----- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index a248e79805a..b376ce3da66 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -787,9 +787,9 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1956 | 2 | 0 | 10 |", - "| cpu | 1961 | 3 | 20 | 40 |", - "| cpu | 1961 | 3 | 50 | 70 |", + "| cpu | 1961 | 3 | 0 | 20 |", + "| cpu | 1961 | 3 | 30 | 50 |", + "| cpu | 1961 | 3 | 60 | 80 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -802,9 +802,9 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| mem | 1956 | 2 | 0 | 10 |", - "| mem | 1961 | 3 | 20 | 40 |", - "| mem | 1961 | 3 | 50 | 70 |", + "| mem | 1961 | 3 | 0 | 20 |", + "| mem | 1961 | 3 | 30 | 50 |", + "| mem | 1961 | 3 | 60 | 80 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -816,12 +816,12 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1956 | 2 | 0 | 10 |", - "| cpu | 1961 | 3 | 20 | 40 |", - "| cpu | 1961 | 3 | 50 | 70 |", - "| mem | 1956 | 2 | 0 | 10 |", - "| mem | 1961 | 3 | 20 | 40 |", - "| mem | 1961 | 3 | 50 | 70 |", + "| cpu | 1961 | 3 | 0 | 20 |", + "| cpu | 1961 | 3 | 30 | 50 |", + "| cpu | 1961 | 3 | 60 | 80 |", + "| mem | 1961 | 3 | 0 | 20 |", + "| mem | 1961 | 3 | 30 | 50 |", + "| mem | 1961 | 3 | 60 | 80 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -834,10 +834,10 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1956 | 2 | 0 | 10 |", - "| cpu | 1961 | 3 | 20 | 40 |", - "| cpu | 1961 | 3 | 50 | 70 |", - "| mem | 1961 | 3 | 50 | 70 |", + "| cpu | 1961 | 3 | 0 | 20 |", + "| cpu | 1961 | 3 | 30 | 50 |", + "| cpu | 1961 | 3 | 60 | 80 |", + "| mem | 1961 | 3 | 60 | 80 |", "+------------+------------+-----------+----------+----------+", ], }, diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 16dccd8e8d7..c15ec78f6ed 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -365,11 +365,11 @@ impl WalObjectStore { last_wal_sequence_number: Option, all_wal_file_paths: &[Path], ) -> Vec { - // if we have a last wal path from persisted snapshots, we don't need to load the wal files - // that came before it: all_wal_file_paths .iter() .filter(|path| { + // if we have a last wal path from persisted snapshots, we don't need to load the wal files + // that came before it: if let Some(last_wal_sequence_number) = last_wal_sequence_number { let last_wal_path = wal_path(&self.host_identifier_prefix, last_wal_sequence_number); @@ -378,7 +378,10 @@ impl WalObjectStore { ?last_wal_path, ">>> path and last_wal_path check when replaying" ); - *path >= &last_wal_path + // last_wal_sequence_number that comes from persisted snapshot is + // holds the last wal number (inclusive) that has been snapshotted + // so, anything greater than that needs to be loaded + *path > &last_wal_path } else { true } diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index ac006a082a9..dd5dee333fa 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -179,6 +179,13 @@ impl QueryableBuffer { let persist_jobs = { let mut buffer = self.buffer.write(); + // need to buffer first before snapshotting + buffer.buffer_ops( + &write.ops, + &self.last_cache_provider, + &self.distinct_cache_provider, + ); + let mut persisting_chunks = vec![]; let catalog = Arc::clone(&buffer.catalog); for (database_id, table_map) in buffer.db_to_table.iter_mut() { @@ -208,7 +215,7 @@ impl QueryableBuffer { table_name.as_ref(), table_id.as_u32(), chunk.chunk_time, - write.wal_file_number, + snapshot_details.last_wal_sequence_number, ), batch: chunk.record_batch, schema: chunk.schema, @@ -221,12 +228,6 @@ impl QueryableBuffer { } } - buffer.buffer_ops( - &write.ops, - &self.last_cache_provider, - &self.distinct_cache_provider, - ); - persisting_chunks }; @@ -278,7 +279,7 @@ impl QueryableBuffer { let mut persisted_snapshot = PersistedSnapshot::new( persister.host_identifier_prefix().to_string(), snapshot_details.snapshot_sequence_number, - wal_file_number, + snapshot_details.last_wal_sequence_number, catalog.sequence_number(), ); let mut cache_notifiers = vec![];