Skip to content

Commit

Permalink
feat: only load wal files after most recent snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj committed Jan 11, 2025
1 parent 88e1092 commit 291b9ee
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
32 changes: 24 additions & 8 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl WalObjectStore {
last_snapshot_sequence_number,
);

wal.replay().await?;
wal.replay(last_wal_sequence_number).await?;
let wal = Arc::new(wal);
background_wal_flush(Arc::clone(&wal), flush_interval);

Expand Down Expand Up @@ -96,9 +96,14 @@ impl WalObjectStore {

/// Loads the WAL files in order from object store, calling the file notifier on each one and
/// populating the snapshot tracker with the WAL periods.
pub async fn replay(&self) -> crate::Result<()> {
pub async fn replay(
&self,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
) -> crate::Result<()> {
debug!(">>> replaying");
let paths = self.load_existing_wal_file_paths().await?;
let paths = self
.load_existing_wal_file_paths(last_wal_sequence_number)
.await?;

let last_snapshot_sequence_number = {
self.flush_buffer
Expand Down Expand Up @@ -335,7 +340,10 @@ impl WalObjectStore {
snapshot_response
}

async fn load_existing_wal_file_paths(&self) -> crate::Result<Vec<Path>> {
async fn load_existing_wal_file_paths(
&self,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
) -> crate::Result<Vec<Path>> {
let mut paths = Vec::new();
let mut offset: Option<Path> = None;
let path = Path::from(format!("{host}/wal", host = self.host_identifier_prefix));
Expand All @@ -358,6 +366,14 @@ impl WalObjectStore {
paths.sort();
offset = Some(paths.last().unwrap().clone())
}

// if we have a last wal path from persisted snapshots, we don't need to load the wal files
// that came before it:
if let Some(last_wal_sequence_number) = last_wal_sequence_number {
let last_wal_path = wal_path(&self.host_identifier_prefix, last_wal_sequence_number);
paths.retain(|path| path >= &last_wal_path);
}

paths.sort();

Ok(paths)
Expand Down Expand Up @@ -968,13 +984,13 @@ mod tests {
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
replay_wal.load_existing_wal_file_paths(None).await.unwrap(),
vec![
Path::from("my_host/wal/00000000001.wal"),
Path::from("my_host/wal/00000000002.wal")
]
);
replay_wal.replay().await.unwrap();
replay_wal.replay(None).await.unwrap();
let replay_notifier = replay_notifier
.as_any()
.downcast_ref::<TestNotifier>()
Expand Down Expand Up @@ -1109,10 +1125,10 @@ mod tests {
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
replay_wal.load_existing_wal_file_paths(None).await.unwrap(),
vec![Path::from("my_host/wal/00000000003.wal")]
);
replay_wal.replay().await.unwrap();
replay_wal.replay(None).await.unwrap();
let replay_notifier = replay_notifier
.as_any()
.downcast_ref::<TestNotifier>()
Expand Down
8 changes: 0 additions & 8 deletions influxdb3_write/src/write_buffer/persisted_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ impl PersistedFiles {
}
}

/// Add a file to the list of persisted files
pub fn add_file(&self, db_id: DbId, table_id: TableId, file: ParquetFile) {
let mut inner = self.inner.write();
let tables = inner.files.entry(db_id).or_default();
let table_files = tables.entry(table_id).or_default();
table_files.push(file);
}

/// Add all files from a persisted snapshot
pub fn add_persisted_snapshot_files(&self, persisted_snapshot: PersistedSnapshot) {
let mut inner = self.inner.write();
Expand Down

0 comments on commit 291b9ee

Please sign in to comment.