Skip to content

Commit

Permalink
refactor: address PR comment
Browse files Browse the repository at this point in the history
  • Loading branch information
praveen-influx committed Jan 12, 2025
1 parent 5e76230 commit 2f6f29a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 27 deletions.
32 changes: 16 additions & 16 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,9 +787,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -802,9 +802,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| mem | 1956 | 2 | 0 | 10 |",
"| mem | 1961 | 3 | 20 | 40 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -816,12 +816,12 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| mem | 1956 | 2 | 0 | 10 |",
"| mem | 1961 | 3 | 20 | 40 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -834,10 +834,10 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand Down
9 changes: 6 additions & 3 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,11 @@ impl WalObjectStore {
last_wal_sequence_number: Option<WalFileSequenceNumber>,
all_wal_file_paths: &[Path],
) -> Vec<Path> {
// if we have a last wal path from persisted snapshots, we don't need to load the wal files
// that came before it:
all_wal_file_paths
.iter()
.filter(|path| {
// if we have a last wal path from persisted snapshots, we don't need to load the wal files
// that came before it:
if let Some(last_wal_sequence_number) = last_wal_sequence_number {
let last_wal_path =
wal_path(&self.host_identifier_prefix, last_wal_sequence_number);
Expand All @@ -378,7 +378,10 @@ impl WalObjectStore {
?last_wal_path,
">>> path and last_wal_path check when replaying"
);
*path >= &last_wal_path
// last_wal_sequence_number that comes from persisted snapshot is
// holds the last wal number (inclusive) that has been snapshotted
// so, anything greater than that needs to be loaded
*path > &last_wal_path
} else {
true
}
Expand Down
17 changes: 9 additions & 8 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ impl QueryableBuffer {

let persist_jobs = {
let mut buffer = self.buffer.write();
// need to buffer first before snapshotting
buffer.buffer_ops(
&write.ops,
&self.last_cache_provider,
&self.distinct_cache_provider,
);

let mut persisting_chunks = vec![];
let catalog = Arc::clone(&buffer.catalog);
for (database_id, table_map) in buffer.db_to_table.iter_mut() {
Expand Down Expand Up @@ -208,7 +215,7 @@ impl QueryableBuffer {
table_name.as_ref(),
table_id.as_u32(),
chunk.chunk_time,
write.wal_file_number,
snapshot_details.last_wal_sequence_number,
),
batch: chunk.record_batch,
schema: chunk.schema,
Expand All @@ -221,12 +228,6 @@ impl QueryableBuffer {
}
}

buffer.buffer_ops(
&write.ops,
&self.last_cache_provider,
&self.distinct_cache_provider,
);

persisting_chunks
};

Expand Down Expand Up @@ -278,7 +279,7 @@ impl QueryableBuffer {
let mut persisted_snapshot = PersistedSnapshot::new(
persister.host_identifier_prefix().to_string(),
snapshot_details.snapshot_sequence_number,
wal_file_number,
snapshot_details.last_wal_sequence_number,
catalog.sequence_number(),
);
let mut cache_notifiers = vec![];
Expand Down

0 comments on commit 2f6f29a

Please sign in to comment.