Skip to content

Commit

Permalink
refactor: address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
praveen-influx committed Jan 11, 2025
1 parent 4faa52b commit 80a61d5
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 42 deletions.
11 changes: 6 additions & 5 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,16 @@ pub struct Config {
)]
pub wal_max_write_buffer_size: usize,

/// Number of wal files to keep behind, wal flush does not clear the wal files immediately
/// instead they are only deleted when num wal files count exceed this keep behind size
/// Number of snapshotted wal files to retain in object store, wal flush does not clear
/// the wal files immediately instead they are only deleted when snapshotted and num wal files
/// count exceeds this size
#[clap(
long = "num-wal-files-to-keep",
long = "snapshotted-wal-files-to-keep",
env = "INFLUXDB3_NUM_WAL_FILES_TO_KEEP",
default_value = "300",
action
)]
pub num_wal_files_to_keep: u64,
pub snapshotted_wal_files_to_keep: u64,

// TODO - tune this default:
/// The size of the query log. Up to this many queries will remain in the log before
Expand Down Expand Up @@ -512,7 +513,7 @@ pub async fn command(config: Config) -> Result<()> {
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metrics),
num_wal_files_to_keep: config.num_wal_files_to_keep,
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ mod tests {
wal_config,
parquet_cache: None,
metric_registry: Arc::clone(&metric_registry),
num_wal_files_to_keep: 10,
snapshotted_wal_files_to_keep: 10,
})
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ mod tests {
wal_config: WalConfig::test_config(),
parquet_cache: Some(parquet_cache),
metric_registry: Arc::clone(&metrics),
num_wal_files_to_keep: 100,
snapshotted_wal_files_to_keep: 100,
},
)
.await
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ mod tests {
},
parquet_cache: Some(parquet_cache),
metric_registry: Default::default(),
num_wal_files_to_keep: 1,
snapshotted_wal_files_to_keep: 1,
})
.await
.unwrap();
Expand Down
41 changes: 15 additions & 26 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@ use std::{str::FromStr, sync::Arc};
use tokio::sync::Mutex;
use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore};

// struct WalObjectStoreArgs {
// time_provider: Arc<dyn TimeProvider>,
// object_store: Arc<dyn ObjectStore>,
// host_identifier_prefix: String,
// file_notifier: Arc<dyn WalFileNotifier>,
// config: WalConfig,
// last_wal_sequence_number: Option<WalFileSequenceNumber>,
// last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
// all_wal_file_paths: &[Path],
// }

#[derive(Debug)]
pub struct WalObjectStore {
object_store: Arc<dyn ObjectStore>,
Expand All @@ -36,16 +25,15 @@ pub struct WalObjectStore {
added_file_notifiers: parking_lot::Mutex<Vec<Arc<dyn WalFileNotifier>>>,
/// Buffered wal ops go in here along with the state to track when to snapshot
flush_buffer: Mutex<FlushBuffer>,
/// oldest, last and latest wal file nums are used to keep track of what wal files
/// to remove from the OS
num_wal_files_to_keep: u64,
// we need atomics or mutex
/// number of snapshotted wal files to retain in object store
snapshotted_wal_files_to_keep: u64,
wal_remover: parking_lot::Mutex<WalFileRemover>,
}

impl WalObjectStore {
/// Creates a new WAL. This will replay files into the notifier and trigger any snapshots that
/// exist in the WAL files that haven't been cleaned up yet.
#[allow(clippy::too_many_arguments)]
pub async fn new(
time_provider: Arc<dyn TimeProvider>,
object_store: Arc<dyn ObjectStore>,
Expand All @@ -54,7 +42,7 @@ impl WalObjectStore {
config: WalConfig,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
num_wal_files_to_keep: u64,
snapshotted_wal_files_to_keep: u64,
) -> Result<Arc<Self>, crate::Error> {
let host_identifier = host_identifier_prefix.into();
let all_wal_file_paths =
Expand All @@ -69,7 +57,7 @@ impl WalObjectStore {
last_wal_sequence_number,
last_snapshot_sequence_number,
&all_wal_file_paths,
num_wal_files_to_keep,
snapshotted_wal_files_to_keep,
);

wal.replay(last_wal_sequence_number, &all_wal_file_paths)
Expand All @@ -80,6 +68,7 @@ impl WalObjectStore {
Ok(wal)
}

#[allow(clippy::too_many_arguments)]
fn new_without_replay(
time_provider: Arc<dyn TimeProvider>,
object_store: Arc<dyn ObjectStore>,
Expand Down Expand Up @@ -118,10 +107,10 @@ impl WalObjectStore {
last_snapshot_sequence_number,
),
)),
num_wal_files_to_keep,
snapshotted_wal_files_to_keep: num_wal_files_to_keep,
wal_remover: parking_lot::Mutex::new(WalFileRemover {
oldest_wal_file: oldest_wal_file_num,
last_wal_sequence_number,
last_snapshotted_wal_sequence_number: last_wal_sequence_number,
}),
}
}
Expand Down Expand Up @@ -415,12 +404,12 @@ impl WalObjectStore {
">>> checking num wal files to delete"
);

if curr_num_files > self.num_wal_files_to_keep {
let num_files_to_delete = curr_num_files - self.num_wal_files_to_keep;
if curr_num_files > self.snapshotted_wal_files_to_keep {
let num_files_to_delete = curr_num_files - self.snapshotted_wal_files_to_keep;
let last_to_delete = oldest + num_files_to_delete;

debug!(
num_files_to_keep = ?self.num_wal_files_to_keep,
num_files_to_keep = ?self.snapshotted_wal_files_to_keep,
?curr_num_files,
?num_files_to_delete,
?last_to_delete,
Expand Down Expand Up @@ -839,26 +828,26 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber {
#[derive(Debug)]
struct WalFileRemover {
oldest_wal_file: Option<WalFileSequenceNumber>,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
last_snapshotted_wal_sequence_number: Option<WalFileSequenceNumber>,
}

impl WalFileRemover {
fn get_current_state(&self) -> (u64, u64) {
(
self.oldest_wal_file.map(|num| num.as_u64()).unwrap_or(0),
self.last_wal_sequence_number
self.last_snapshotted_wal_sequence_number
.map(|num| num.as_u64())
.unwrap_or(0),
)
}

fn update_last_wal_num(&mut self, last_wal: WalFileSequenceNumber) {
self.last_wal_sequence_number.replace(last_wal);
self.last_snapshotted_wal_sequence_number.replace(last_wal);
}

fn update_state(&mut self, oldest: u64, last: u64) {
self.oldest_wal_file = Some(WalFileSequenceNumber::new(oldest));
self.last_wal_sequence_number = Some(WalFileSequenceNumber::new(last));
self.last_snapshotted_wal_sequence_number = Some(WalFileSequenceNumber::new(last));
}
}

Expand Down
16 changes: 8 additions & 8 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub struct WriteBufferImplArgs {
pub wal_config: WalConfig,
pub parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
pub metric_registry: Arc<Registry>,
pub num_wal_files_to_keep: u64,
pub snapshotted_wal_files_to_keep: u64,
}

impl WriteBufferImpl {
Expand All @@ -177,7 +177,7 @@ impl WriteBufferImpl {
wal_config,
parquet_cache,
metric_registry,
num_wal_files_to_keep,
snapshotted_wal_files_to_keep,
}: WriteBufferImplArgs,
) -> Result<Arc<Self>> {
// load snapshots and replay the wal into the in memory buffer
Expand Down Expand Up @@ -221,7 +221,7 @@ impl WriteBufferImpl {
wal_config,
last_wal_sequence_number,
last_snapshot_sequence_number,
num_wal_files_to_keep,
snapshotted_wal_files_to_keep,
)
.await?;

Expand Down Expand Up @@ -936,7 +936,7 @@ mod tests {
wal_config: WalConfig::test_config(),
parquet_cache: Some(Arc::clone(&parquet_cache)),
metric_registry: Default::default(),
num_wal_files_to_keep: 10,
snapshotted_wal_files_to_keep: 10,
})
.await
.unwrap();
Expand Down Expand Up @@ -1023,7 +1023,7 @@ mod tests {
},
parquet_cache: Some(Arc::clone(&parquet_cache)),
metric_registry: Default::default(),
num_wal_files_to_keep: 10,
snapshotted_wal_files_to_keep: 10,
})
.await
.unwrap();
Expand Down Expand Up @@ -1092,7 +1092,7 @@ mod tests {
},
parquet_cache: wbuf.parquet_cache.clone(),
metric_registry: Default::default(),
num_wal_files_to_keep: 10,
snapshotted_wal_files_to_keep: 10,
})
.await
.unwrap()
Expand Down Expand Up @@ -1322,7 +1322,7 @@ mod tests {
},
parquet_cache: write_buffer.parquet_cache.clone(),
metric_registry: Default::default(),
num_wal_files_to_keep: 10,
snapshotted_wal_files_to_keep: 10,
})
.await
.unwrap();
Expand Down Expand Up @@ -2610,7 +2610,7 @@ mod tests {
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metric_registry),
num_wal_files_to_keep: 10,
snapshotted_wal_files_to_keep: 10,
})
.await
.unwrap();
Expand Down

0 comments on commit 80a61d5

Please sign in to comment.