Skip to content

Commit

Permalink
refactor: refine KvbackendConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 26, 2025
1 parent 22cf18e commit e7a627d
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 53 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
| `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka. |
| `wal.dir` | String | Unset | The directory to store the WAL files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.file_size` | String | `128MB` | The size of the WAL segment file.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_interval` | String | `1m` | The interval to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a purge.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_interval` | String | `1m` | The interval to trigger a purge.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.read_batch_size` | Integer | `128` | The read batch size.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_write` | Bool | `false` | Whether to use sync write.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. |
Expand All @@ -86,8 +86,9 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
| `metadata_store.file_size` | String | `64MB` | The size of the metadata store log file. |
| `metadata_store.purge_threshold` | String | `256MB` | The threshold of the metadata store size to trigger a purge. |
| `metadata_store.purge_interval` | String | `1m` | The interval of the metadata store to trigger a purge. |
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
Expand Down
14 changes: 8 additions & 6 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ dir = "/tmp/greptimedb/wal"
## **It's only used when the provider is `raft_engine`**.
file_size = "128MB"

## The threshold of the WAL size to trigger a flush.
## The threshold of the WAL size to trigger a purge.
## **It's only used when the provider is `raft_engine`**.
purge_threshold = "1GB"

## The interval to trigger a flush.
## The interval to trigger a purge.
## **It's only used when the provider is `raft_engine`**.
purge_interval = "1m"

Expand Down Expand Up @@ -272,10 +272,12 @@ overwrite_entry_start_id = false

## Metadata storage options.
[metadata_store]
## Kv file size in bytes.
file_size = "256MB"
## Kv purge threshold.
purge_threshold = "4GB"
## The size of the metadata store log file.
file_size = "64MB"
## The threshold of the metadata store size to trigger a purge.
purge_threshold = "256MB"
## The interval of the metadata store to trigger a purge.
purge_interval = "1m"

## Procedure storage options.
[procedure]
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,8 @@ impl StartCommand {
let metadata_dir = metadata_store_dir(data_home);
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
metadata_dir,
opts.metadata_store.clone(),
opts.procedure.clone(),
opts.metadata_store,
opts.procedure,
)
.await
.context(StartFrontendSnafu)?;
Expand Down
2 changes: 2 additions & 0 deletions src/common/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
config.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
snafu.workspace = true
sysinfo.workspace = true
toml.workspace = true
Expand Down
21 changes: 14 additions & 7 deletions src/common/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod config;
pub mod error;
pub mod utils;

use std::time::Duration;

use common_base::readable_size::ReadableSize;
pub use config::*;
use serde::{Deserialize, Serialize};
Expand All @@ -34,22 +36,27 @@ pub enum Mode {
Distributed,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvBackendConfig {
// Kv file size in bytes
/// The size of the metadata store backend log file.
pub file_size: ReadableSize,
// Kv purge threshold in bytes
/// The threshold of the metadata store size to trigger a purge.
pub purge_threshold: ReadableSize,
/// The interval of the metadata store to trigger a purge.
#[serde(with = "humantime_serde")]
pub purge_interval: Duration,
}

impl Default for KvBackendConfig {
fn default() -> Self {
Self {
// The log file size 128MB
file_size: ReadableSize::mb(128),
// The log purge threshold 512MB
purge_threshold: ReadableSize::mb(512),
// The log file size 64MB
file_size: ReadableSize::mb(64),
// The log purge threshold 256MB
purge_threshold: ReadableSize::mb(256),
// The log purge interval 1m
purge_interval: Duration::from_secs(60),
}
}
}
2 changes: 1 addition & 1 deletion src/common/procedure/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Max retry times of procedure.
Expand Down
1 change: 0 additions & 1 deletion src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ prometheus.workspace = true
promql-parser.workspace = true
prost.workspace = true
query.workspace = true
raft-engine.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true
Expand Down
14 changes: 3 additions & 11 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::stats::StatementStatistics;
use query::QueryEngineRef;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::export_metrics::ExportMetricsTask;
Expand Down Expand Up @@ -138,16 +137,9 @@ impl Instance {
"Creating metadata kvbackend with config: {:?}",
kv_backend_config
);
let kv_backend = RaftEngineBackend::try_open_with_cfg(Config {
dir,
purge_threshold: ReadableSize(kv_backend_config.purge_threshold.0),
recovery_mode: RecoveryMode::TolerateTailCorruption,
batch_compression_threshold: ReadableSize::kb(8),
target_file_size: ReadableSize(kv_backend_config.file_size.0),
..Default::default()
})
.map_err(BoxedError::new)
.context(error::OpenRaftEngineBackendSnafu)?;
let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
.map_err(BoxedError::new)
.context(error::OpenRaftEngineBackendSnafu)?;

let kv_backend = Arc::new(kv_backend);
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
Expand Down
1 change: 1 addition & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
Expand Down
35 changes: 17 additions & 18 deletions src/log-store/src/raft_engine/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::any::Any;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use common_config::KvBackendConfig;
use common_error::ext::BoxedError;
use common_meta::error as meta_error;
use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
Expand All @@ -32,7 +32,7 @@ use common_meta::rpc::store::{
use common_meta::rpc::KeyValue;
use common_meta::util::get_next_prefix_key;
use common_runtime::RepeatedTask;
use raft_engine::{Config, Engine, LogBatch};
use raft_engine::{Config, Engine, LogBatch, ReadableSize, RecoveryMode};
use snafu::{IntoError, ResultExt};

use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartGcTaskSnafu};
Expand Down Expand Up @@ -69,15 +69,24 @@ fn ensure_dir(dir: &str) -> error::Result<()> {
}

impl RaftEngineBackend {
pub fn try_open_with_cfg(config: Config) -> error::Result<Self> {
ensure_dir(&config.dir)?;
if let Some(spill_dir) = &config.spill_dir {
pub fn try_open_with_cfg(dir: String, config: &KvBackendConfig) -> error::Result<Self> {
let cfg = Config {
dir: dir.to_string(),
purge_threshold: ReadableSize(config.purge_threshold.0),
recovery_mode: RecoveryMode::TolerateTailCorruption,
batch_compression_threshold: ReadableSize::kb(8),
target_file_size: ReadableSize(config.file_size.0),
..Default::default()
};

ensure_dir(&dir)?;
if let Some(spill_dir) = &cfg.spill_dir {
ensure_dir(spill_dir)?;
}

let engine = Arc::new(Engine::open(config).context(RaftEngineSnafu)?);
let engine = Arc::new(Engine::open(cfg).context(RaftEngineSnafu)?);
let gc_task = RepeatedTask::new(
Duration::from_secs(60),
config.purge_interval,
Box::new(PurgeExpiredFilesFunction {
engine: engine.clone(),
}),
Expand Down Expand Up @@ -412,21 +421,11 @@ mod tests {
};
use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse};
use common_test_util::temp_dir::create_temp_dir;
use raft_engine::{Config, ReadableSize, RecoveryMode};

use super::*;

fn build_kv_backend(dir: String) -> RaftEngineBackend {
let config = Config {
dir,
spill_dir: None,
recovery_mode: RecoveryMode::AbsoluteConsistency,
target_file_size: ReadableSize::mb(4),
purge_threshold: ReadableSize::mb(16),
..Default::default()
};

RaftEngineBackend::try_open_with_cfg(config).unwrap()
RaftEngineBackend::try_open_with_cfg(dir, &KvBackendConfig::default()).unwrap()
}

#[tokio::test]
Expand Down
5 changes: 3 additions & 2 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,9 @@ prefill_log_files = false
{storage}
[metadata_store]
file_size = "256MiB"
purge_threshold = "4GiB"
file_size = "64MiB"
purge_threshold = "256MiB"
purge_interval = "1m"
[procedure]
max_retry_times = 3
Expand Down

0 comments on commit e7a627d

Please sign in to comment.