Skip to content

Commit

Permalink
feat: snapshot when wal buffer is empty
Browse files Browse the repository at this point in the history
- This commit changes the functionality to allow snapshots to happen even when
  wal buffer is empty. For snapshots wal periods are still required but
  not the wal buffer. To allow this, we write a no-op into wal file with
  snapshot details. This enables force snapshotting functionality

closes: #25685
  • Loading branch information
praveen-influx committed Jan 8, 2025
1 parent dfc853d commit eed5a46
Show file tree
Hide file tree
Showing 11 changed files with 536 additions and 190 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3_cache/src/last_cache/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ impl LastCacheProvider {
}
}
WalOp::Catalog(_) => (),
WalOp::Noop(_) => (),
}
}
}
Expand Down
40 changes: 17 additions & 23 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1956 | 2 | 20 | 30 |",
"| cpu | 1956 | 2 | 40 | 50 |",
"| cpu | 1956 | 2 | 60 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -818,10 +817,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| mem | 1956 | 2 | 0 | 10 |",
"| mem | 1956 | 2 | 20 | 30 |",
"| mem | 1956 | 2 | 40 | 50 |",
"| mem | 1956 | 2 | 60 | 70 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -833,32 +831,28 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1956 | 2 | 20 | 30 |",
"| cpu | 1956 | 2 | 40 | 50 |",
"| cpu | 1956 | 2 | 60 | 70 |",
"| mem | 1956 | 2 | 0 | 10 |",
"| mem | 1956 | 2 | 20 | 30 |",
"| mem | 1956 | 2 | 40 | 50 |",
"| mem | 1956 | 2 | 60 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
TestCase {
query: "\
SELECT table_name, size_bytes, row_count, min_time, max_time \
FROM system.parquet_files \
LIMIT 6",
LIMIT 4",
expected: &[
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1956 | 2 | 20 | 30 |",
"| cpu | 1956 | 2 | 40 | 50 |",
"| cpu | 1956 | 2 | 60 | 70 |",
"| mem | 1956 | 2 | 40 | 50 |",
"| mem | 1956 | 2 | 60 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand Down
3 changes: 3 additions & 0 deletions influxdb3_wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ tokio.workspace = true

[lints]
workspace = true

[dev-dependencies]
test-log.workspace = true
35 changes: 31 additions & 4 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub mod object_store;
pub mod serialize;
mod snapshot_tracker;

use crate::snapshot_tracker::SnapshotInfo;
use async_trait::async_trait;
use data_types::Timestamp;
use hashbrown::HashMap;
Expand Down Expand Up @@ -83,14 +82,26 @@ pub trait Wal: Debug + Send + Sync + 'static {
&self,
) -> Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
SnapshotDetails,
OwnedSemaphorePermit,
)>;

/// This is similar to flush buffer but it allows for snapshot to be done immediately rather
/// than waiting for wal periods to stack up in [`snapshot_tracker::SnapshotTracker`].
/// Because forcing flush buffer can happen with no ops in wal buffer this call will add
/// a [`WalOp::Noop`] into the wal buffer if it is empty and then carry on with snapshotting.
async fn force_flush_buffer(
&self,
) -> Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotDetails,
OwnedSemaphorePermit,
)>;

/// Removes any snapshot wal files
async fn cleanup_snapshot(
&self,
snapshot_details: SnapshotInfo,
snapshot_details: SnapshotDetails,
snapshot_permit: OwnedSemaphorePermit,
);

Expand Down Expand Up @@ -218,6 +229,7 @@ impl Default for Gen1Duration {
pub enum WalOp {
Write(WriteBatch),
Catalog(OrderedCatalogBatch),
Noop(i64),
}

impl PartialOrd for WalOp {
Expand All @@ -240,6 +252,11 @@ impl Ord for WalOp {

// For two Write ops, consider them equal
(WalOp::Write(_), WalOp::Write(_)) => Ordering::Equal,
// all noops should stay where they are no need to reorder them
// the noop at the moment at least should appear only in cases
// when there are no other ops in wal buffer.
(_, WalOp::Noop(_)) => Ordering::Equal,
(WalOp::Noop(_), _) => Ordering::Equal,
}
}
}
Expand All @@ -249,13 +266,15 @@ impl WalOp {
match self {
WalOp::Write(w) => Some(w),
WalOp::Catalog(_) => None,
WalOp::Noop(_) => None,
}
}

pub fn as_catalog(&self) -> Option<&CatalogBatch> {
match self {
WalOp::Write(_) => None,
WalOp::Catalog(c) => Some(&c.catalog),
WalOp::Noop(_) => None,
}
}
}
Expand Down Expand Up @@ -867,6 +886,10 @@ impl WalContents {
pub fn is_empty(&self) -> bool {
self.ops.is_empty() && self.snapshot.is_none()
}

pub fn has_only_no_op(&self) -> bool {
self.ops.len() == 1 && matches!(self.ops.first().unwrap(), WalOp::Noop(_))
}
}

#[derive(
Expand Down Expand Up @@ -934,8 +957,12 @@ pub struct SnapshotDetails {
pub snapshot_sequence_number: SnapshotSequenceNumber,
/// All chunks with data before this time can be snapshot and persisted
pub end_time_marker: i64,
/// All wal files with a sequence number >= to this can be deleted once snapshotting is complete
pub first_wal_sequence_number: WalFileSequenceNumber,
/// All wal files with a sequence number <= to this can be deleted once snapshotting is complete
pub last_wal_sequence_number: WalFileSequenceNumber,
// both forced and 3 times snapshot size should set this flag
pub forced: bool,
}

pub fn background_wal_flush<W: Wal>(
Expand All @@ -958,7 +985,7 @@ pub fn background_wal_flush<W: Wal>(
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);
assert_eq!(snapshot_info, snapshot_details);

snapshot_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
Expand Down
Loading

0 comments on commit eed5a46

Please sign in to comment.