Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce num wal files to keep #25801

Merged
merged 3 commits into from
Jan 12, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor: address PR comment
praveen-influx committed Jan 12, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit a03f3fb50954b398988ed4862a5f68fec87d239c
34 changes: 16 additions & 18 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -775,8 +775,6 @@ mod tests {
expected: &'a [&'a str],
}

// TODO: I wasn't expecting to change these test cases when disabling deletion of wal files, this
// probably needs some reasoning with the test, currently just updated the tests
let test_cases = [
TestCase {
query: "\
@@ -787,9 +785,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
@@ -802,9 +800,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| mem | 1956 | 2 | 0 | 10 |",
"| mem | 1961 | 3 | 20 | 40 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
@@ -816,12 +814,12 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| mem | 1956 | 2 | 0 | 10 |",
"| mem | 1961 | 3 | 20 | 40 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
@@ -834,10 +832,10 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
9 changes: 6 additions & 3 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
@@ -365,11 +365,11 @@ impl WalObjectStore {
last_wal_sequence_number: Option<WalFileSequenceNumber>,
all_wal_file_paths: &[Path],
) -> Vec<Path> {
// if we have a last wal path from persisted snapshots, we don't need to load the wal files
// that came before it:
all_wal_file_paths
.iter()
.filter(|path| {
// if we have a last wal path from persisted snapshots, we don't need to load the wal files
// that came before it:
if let Some(last_wal_sequence_number) = last_wal_sequence_number {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'm not sure of looking through this, does the persisted snapshot have the last known wal file number saved, or is it the wal file number it's snapshotting to? It should have both, but the one we care about here is the one it's snapshotting to. Can you verify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The persisted snapshot file carries the wal file number that came in the current wal contents, that's why @hiltontj had to use >= instead of >. SnapshotDetails holds the last_wal_file_sequence_number (which is based on wal period we're snapshotting). If we used the one coming in SnapshotDetails we can then say load files only > this wal file number

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so we need to update so that we're also keeping a record of what WAL file we're snapshotting to. That way we initialize with the correct one (i.e. not the latest WAL file written). In the case of a forced snapshot, those numbers will be the same, but in the case of normal snapshots, the snapshot number will be < latest wal number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed it in commit - good spot, thanks.

let last_wal_path =
wal_path(&self.host_identifier_prefix, last_wal_sequence_number);
@@ -378,7 +378,10 @@ impl WalObjectStore {
?last_wal_path,
">>> path and last_wal_path check when replaying"
);
*path >= &last_wal_path
// last_wal_sequence_number that comes from persisted snapshot is
// holds the last wal number (inclusive) that has been snapshotted
// so, anything greater than that needs to be loaded
*path > &last_wal_path
} else {
true
}
17 changes: 9 additions & 8 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
@@ -179,6 +179,13 @@ impl QueryableBuffer {

let persist_jobs = {
let mut buffer = self.buffer.write();
// need to buffer first before snapshotting
buffer.buffer_ops(
&write.ops,
&self.last_cache_provider,
&self.distinct_cache_provider,
);

let mut persisting_chunks = vec![];
let catalog = Arc::clone(&buffer.catalog);
for (database_id, table_map) in buffer.db_to_table.iter_mut() {
@@ -208,7 +215,7 @@ impl QueryableBuffer {
table_name.as_ref(),
table_id.as_u32(),
chunk.chunk_time,
write.wal_file_number,
snapshot_details.last_wal_sequence_number,
),
batch: chunk.record_batch,
schema: chunk.schema,
@@ -221,12 +228,6 @@ impl QueryableBuffer {
}
}

buffer.buffer_ops(
&write.ops,
&self.last_cache_provider,
&self.distinct_cache_provider,
);

persisting_chunks
};

@@ -278,7 +279,7 @@ impl QueryableBuffer {
let mut persisted_snapshot = PersistedSnapshot::new(
persister.host_identifier_prefix().to_string(),
snapshot_details.snapshot_sequence_number,
wal_file_number,
snapshot_details.last_wal_sequence_number,
catalog.sequence_number(),
);
let mut cache_notifiers = vec![];