diff --git a/.github/workflows/pr-ci.yml b/.github/workflows/pr-ci.yml index f7e6a50fab0..3cb1e7ede9e 100644 --- a/.github/workflows/pr-ci.yml +++ b/.github/workflows/pr-ci.yml @@ -73,6 +73,12 @@ jobs: cargo test --features compat_old_proxy --package tests --test failpoints cases::test_import_service cargo test --features compat_old_proxy --package tests --test failpoints cases::test_proxy_replica_read # tests based on new-mock-engine-store, with compat for new proxy - cargo test --package tests --test proxy normal + cargo test --package tests --test proxy normal::store + cargo test --package tests --test proxy normal::region + cargo test --package tests --test proxy normal::config + cargo test --package tests --test proxy normal::write + cargo test --package tests --test proxy normal::ingest + cargo test --package tests --test proxy normal::snapshot + cargo test --package tests --test proxy normal::restart # tests based on new-mock-engine-store, for some tests not available for new proxy cargo test --package tests --test proxy proxy diff --git a/Cargo.lock b/Cargo.lock index ea6dde423be..f1800aaec9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4107,7 +4107,6 @@ dependencies = [ "backup", "backup-stream", "causal_ts", - "cdc", "chrono", "clap", "collections", diff --git a/components/proxy_server/Cargo.toml b/components/proxy_server/Cargo.toml index 2d8b7493264..fbef4bad991 100644 --- a/components/proxy_server/Cargo.toml +++ b/components/proxy_server/Cargo.toml @@ -40,7 +40,6 @@ async-stream = "0.2" backup = { path = "../backup", default-features = false } backup-stream = { path = "../backup-stream", default-features = false } causal_ts = { path = "../causal_ts" } -cdc = { path = "../cdc", default-features = false } chrono = "0.4" clap = "2.32" collections = { path = "../collections" } diff --git a/components/proxy_server/src/config.rs b/components/proxy_server/src/config.rs index f98fc3fa710..5775f74abaa 100644 --- a/components/proxy_server/src/config.rs +++ b/components/proxy_server/src/config.rs @@ -11,7 +11,7 @@ use online_config::OnlineConfig; use serde_derive::{Deserialize, Serialize}; use serde_with::with_prefix; use tikv::config::{TiKvConfig, LAST_CONFIG_FILE}; -use tikv_util::crit; +use tikv_util::{config::ReadableDuration, crit}; use crate::fatal; @@ -165,11 +165,14 @@ pub fn check_critical_config(config: &TiKvConfig) -> Result<(), String> { // Check current critical configurations with last time, if there are some // changes, user must guarantee relevant works have been done. if let Some(mut cfg) = get_last_config(&config.storage.data_dir) { + info!("check_critical_config finished compatible_adjust"); cfg.compatible_adjust(); if let Err(e) = cfg.validate() { warn!("last_tikv.toml is invalid but ignored: {:?}", e); } + info!("check_critical_config finished validate"); config.check_critical_cfg_with(&cfg)?; + info!("check_critical_config finished check_critical_cfg_with"); } Ok(()) } diff --git a/components/proxy_server/src/proxy.rs b/components/proxy_server/src/proxy.rs index 87ea1019b1a..270726879e1 100644 --- a/components/proxy_server/src/proxy.rs +++ b/components/proxy_server/src/proxy.rs @@ -10,6 +10,7 @@ use std::{ use clap::{App, Arg, ArgMatches}; use tikv::config::TiKvConfig; +use tikv_util::config::ReadableDuration; use crate::{ fatal, @@ -30,8 +31,13 @@ pub fn setup_default_tikv_config(default: &mut TiKvConfig) { default.server.addr = TIFLASH_DEFAULT_LISTENING_ADDR.to_string(); default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string(); default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string(); + default.raft_store.region_worker_tick_interval = ReadableDuration::millis(500); + let stale_peer_check_tick = + (10_000 / default.raft_store.region_worker_tick_interval.as_millis()) as usize; + default.raft_store.stale_peer_check_tick = stale_peer_check_tick; } +/// Generate default TiKvConfig, but with some Proxy's default values. pub fn gen_tikv_config( matches: &ArgMatches, is_config_check: bool, @@ -50,11 +56,12 @@ pub fn gen_tikv_config( }, ) .unwrap_or_else(|e| { - panic!( - "invalid auto generated configuration file {}, err {}", + error!( + "invalid default auto generated configuration file {}, err {}", path.display(), e ); + std::process::exit(1); }) }) } @@ -294,7 +301,6 @@ pub unsafe fn run_proxy( overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches); config.logger_compatible_adjust(); - // TODO(tiflash) We should later use ProxyConfig for proxy's own settings like `snap_handle_pool_size` if is_config_check { crate::config::validate_and_persist_config(&mut config, false); match crate::config::ensure_no_common_unrecognized_keys( diff --git a/components/proxy_server/src/run.rs b/components/proxy_server/src/run.rs index 2250aaa62b5..3e20f9168ed 100644 --- a/components/proxy_server/src/run.rs +++ b/components/proxy_server/src/run.rs @@ -17,8 +17,6 @@ use std::{ }; use api_version::{dispatch_api_version, KvFormat}; -use backup_stream::{config::BackupStreamConfigManager, observer::BackupStreamObserver}; -use cdc::{CdcConfigManager, MemoryQuota}; use concurrency_manager::ConcurrencyManager; use encryption_export::{data_key_manager_from_config, DataKeyManager}; use engine_rocks::{ @@ -40,9 +38,8 @@ use futures::executor::block_on; use grpcio::{EnvBuilder, Environment}; use grpcio_health::HealthService; use kvproto::{ - brpb::create_backup, cdcpb::create_change_data, deadlock::create_deadlock, debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst, - kvrpcpb::ApiVersion, resource_usage_agent::create_resource_metering_pub_sub, + kvrpcpb::ApiVersion, }; use pd_client::{PdClient, RpcClient}; use raft_log_engine::RaftLogEngine; @@ -421,8 +418,11 @@ impl TiKvServer { }); // engine_tiflash::RocksEngine has engine_rocks::RocksEngine inside let mut kv_engine = TiFlashEngine::from_rocks(kv_engine); - // TODO(tiflash) setup proxy_config - kv_engine.init(engine_store_server_helper, 2, Some(ffi_hub)); + kv_engine.init( + engine_store_server_helper, + self.proxy_config.raft_store.snap_handle_pool_size, + Some(ffi_hub), + ); let engines = Engines::new(kv_engine, raft_engine); diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index cb1c868d223..f8b48031816 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -504,6 +504,51 @@ impl CoprocessorHost { ); } + pub fn should_pre_apply_snapshot(&self) -> bool { + for observer in &self.registry.apply_snapshot_observers { + let observer = observer.observer.inner(); + if observer.should_pre_apply_snapshot() { + return true; + } + } + false + } + + pub fn pre_apply_snapshot( + &self, + region: &Region, + peer_id: u64, + snap_key: &crate::store::SnapKey, + snap: Option<&crate::store::Snapshot>, + ) { + loop_ob!( + region, + &self.registry.apply_snapshot_observers, + pre_apply_snapshot, + peer_id, + snap_key, + snap, + ); + } + + pub fn post_apply_snapshot( + &self, + region: &Region, + peer_id: u64, + snap_key: &crate::store::SnapKey, + snap: Option<&crate::store::Snapshot>, + ) -> Result<()> { + let mut ctx = ObserverContext::new(region); + for observer in &self.registry.apply_snapshot_observers { + let observer = observer.observer.inner(); + let res = observer.post_apply_snapshot(&mut ctx, peer_id, snap_key, snap); + if res.is_err() { + return res; + } + } + Ok(()) + } + pub fn new_split_checker_host<'a>( &'a self, region: &Region, @@ -646,7 +691,10 @@ mod tests { }; use tikv_util::box_err; - use crate::coprocessor::*; + use crate::{ + coprocessor::*, + store::{SnapKey, Snapshot}, + }; #[derive(Clone, Default)] struct TestCoprocessor { @@ -673,6 +721,9 @@ mod tests { PostExecQuery = 17, PostExecAdmin = 18, OnComputeEngineSize = 19, + PreApplySnapshot = 20, + PostApplySnapshot = 21, + ShouldPreApplySnapshot = 22, } impl Coprocessor for TestCoprocessor {} @@ -840,6 +891,39 @@ mod tests { .fetch_add(ObserverIndex::ApplySst as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } + + fn pre_apply_snapshot( + &self, + ctx: &mut ObserverContext<'_>, + _: u64, + _: &SnapKey, + _: Option<&Snapshot>, + ) { + self.called + .fetch_add(ObserverIndex::PreApplySnapshot as usize, Ordering::SeqCst); + ctx.bypass = self.bypass.load(Ordering::SeqCst); + } + + fn post_apply_snapshot( + &self, + ctx: &mut ObserverContext<'_>, + _: u64, + _: &crate::store::SnapKey, + _: Option<&Snapshot>, + ) -> Result<()> { + self.called + .fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst); + ctx.bypass = self.bypass.load(Ordering::SeqCst); + Ok(()) + } + + fn should_pre_apply_snapshot(&self) -> bool { + self.called.fetch_add( + ObserverIndex::ShouldPreApplySnapshot as usize, + Ordering::SeqCst, + ); + false + } } impl CmdObserver for TestCoprocessor { @@ -984,6 +1068,19 @@ mod tests { host.post_exec(®ion, &cmd, &apply_state, ®ion_state, &mut info); index += ObserverIndex::PostExecQuery as usize; assert_all!([&ob.called], &[index]); + + let key = SnapKey::new(region.get_id(), 1, 1); + host.pre_apply_snapshot(®ion, 0, &key, None); + index += ObserverIndex::PreApplySnapshot as usize; + assert_all!([&ob.called], &[index]); + + let _ = host.post_apply_snapshot(®ion, 0, &key, None); + index += ObserverIndex::PostApplySnapshot as usize; + assert_all!([&ob.called], &[index]); + + host.should_pre_apply_snapshot(); + index += ObserverIndex::ShouldPreApplySnapshot as usize; + assert_all!([&ob.called], &[index]); } #[test] diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 05af6e0d7c4..9f657d9a3a7 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole}; pub mod config; mod consistency_check; pub mod dispatcher; -mod error; +pub mod error; mod metrics; pub mod region_info_accessor; mod split_check; @@ -176,6 +176,36 @@ pub trait ApplySnapshotObserver: Coprocessor { /// Hook to call after applying sst file. Currently the content of the snapshot can't be /// passed to the observer. fn apply_sst(&self, _: &mut ObserverContext<'_>, _: CfName, _path: &str) {} + + /// Hook when receiving Task::Apply. + /// Should pass valid snapshot, the option is only for testing. + /// Notice that we can call `pre_apply_snapshot` to multiple snapshots at + /// the same time. + fn pre_apply_snapshot( + &self, + _: &mut ObserverContext<'_>, + _peer_id: u64, + _: &crate::store::SnapKey, + _: Option<&crate::store::Snapshot>, + ) { + } + + /// Hook when the whole snapshot is applied. + /// Should pass valid snapshot, the option is only for testing. + fn post_apply_snapshot( + &self, + _: &mut ObserverContext<'_>, + _: u64, + _: &crate::store::SnapKey, + _snapshot: Option<&crate::store::Snapshot>, + ) -> Result<()> { + Ok(()) + } + + /// We call pre_apply_snapshot only when one of the observer returns true. + fn should_pre_apply_snapshot(&self) -> bool { + false + } } /// SplitChecker is invoked during a split check scan, and decides to use diff --git a/components/raftstore/src/engine_store_ffi/observer.rs b/components/raftstore/src/engine_store_ffi/observer.rs index 9f80bdd67d4..65d82ca60b6 100644 --- a/components/raftstore/src/engine_store_ffi/observer.rs +++ b/components/raftstore/src/engine_store_ffi/observer.rs @@ -1,10 +1,15 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::sync::{mpsc, Arc, Mutex}; +use std::{ + ops::DerefMut, + path::PathBuf, + str::FromStr, + sync::{atomic::Ordering, mpsc, Arc, Mutex}, +}; use collections::HashMap; use engine_tiflash::FsStatsExt; -use engine_traits::SstMetaInfo; +use engine_traits::{CfName, SstMetaInfo}; use kvproto::{ import_sstpb::SstMeta, metapb::Region, @@ -23,6 +28,7 @@ use yatp::{ }; use crate::{ + coprocessor, coprocessor::{ AdminObserver, ApplyCtxInfo, ApplySnapshotObserver, BoxAdminObserver, BoxApplySnapshotObserver, BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver, @@ -35,7 +41,7 @@ use crate::{ name_to_cf, ColumnFamilyType, EngineStoreServerHelper, RaftCmdHeader, RawCppPtr, TiFlashEngine, WriteCmdType, WriteCmds, CF_DEFAULT, CF_LOCK, CF_WRITE, }, - store::{check_sst_for_ingestion, SnapKey}, + store::{check_sst_for_ingestion, snap::plain_file_used, SnapKey}, Error, Result, }; @@ -124,9 +130,9 @@ impl TiFlashObserver { let engine_store_server_helper = gen_engine_store_server_helper(engine.engine_store_server_helper); // TODO(tiflash) start thread pool - // let snap_pool = Builder::new(tikv_util::thd_name!("region-task")) - // .max_thread_count(snap_handle_pool_size) - // .build_future_pool(); + let snap_pool = Builder::new(tikv_util::thd_name!("region-task")) + .max_thread_count(snap_handle_pool_size) + .build_future_pool(); TiFlashObserver { peer_id, engine_store_server_helper, @@ -134,12 +140,10 @@ impl TiFlashObserver { sst_importer, pre_handle_snapshot_ctx: Arc::new(Mutex::new(PrehandleContext::default())), snap_handle_pool_size, - // apply_snap_pool: Some(Arc::new(snap_pool)), - apply_snap_pool: None, + apply_snap_pool: Some(Arc::new(snap_pool)), } } - // TODO(tiflash) open observers when TiKV merged. pub fn register_to( &self, coprocessor_host: &mut CoprocessorHost, @@ -153,10 +157,10 @@ impl TiFlashObserver { TIFLASH_OBSERVER_PRIORITY, BoxQueryObserver::new(self.clone()), ); - // coprocessor_host.registry.register_apply_snapshot_observer( - // TIFLASH_OBSERVER_PRIORITY, - // BoxApplySnapshotObserver::new(self.clone()), - // ); + coprocessor_host.registry.register_apply_snapshot_observer( + TIFLASH_OBSERVER_PRIORITY, + BoxApplySnapshotObserver::new(self.clone()), + ); coprocessor_host.registry.register_region_change_observer( TIFLASH_OBSERVER_PRIORITY, BoxRegionChangeObserver::new(self.clone()), @@ -232,8 +236,9 @@ impl TiFlashObserver { impl Coprocessor for TiFlashObserver { fn stop(&self) { - // TODO(tiflash) - // self.apply_snap_pool.as_ref().unwrap().shutdown(); + // TODO(tiflash) remove this when pre apply merged + info!("shutdown tiflash observer"; "peer_id" => self.peer_id); + self.apply_snap_pool.as_ref().unwrap().shutdown(); } } @@ -554,6 +559,11 @@ impl RegionChangeObserver for TiFlashObserver { _: StateRole, ) { if e == RegionChangeEvent::Destroy { + info!( + "observe destroy"; + "region_id" => ob_ctx.region().get_id(), + "peer_id" => self.peer_id, + ); self.engine_store_server_helper .handle_destroy(ob_ctx.region().get_id()); } @@ -570,3 +580,198 @@ impl PdTaskObserver for TiFlashObserver { }); } } + +fn retrieve_sst_files(snap: &crate::store::Snapshot) -> Vec<(PathBuf, ColumnFamilyType)> { + let mut sst_views: Vec<(PathBuf, ColumnFamilyType)> = vec![]; + let mut ssts = vec![]; + for cf_file in snap.cf_files() { + // Skip empty cf file. + // CfFile is changed by dynamic region. + if cf_file.size.len() == 0 { + continue; + } + + if cf_file.size[0] == 0 { + continue; + } + + if plain_file_used(cf_file.cf) { + assert!(cf_file.cf == CF_LOCK); + } + // We have only one file for each cf for now. + let mut full_paths = cf_file.file_paths(); + assert!(full_paths.len() != 0); + { + ssts.push((full_paths.remove(0), name_to_cf(cf_file.cf))); + } + } + for (s, cf) in ssts.iter() { + sst_views.push((PathBuf::from_str(s).unwrap(), *cf)); + } + sst_views +} + +fn pre_handle_snapshot_impl( + engine_store_server_helper: &'static EngineStoreServerHelper, + peer_id: u64, + ssts: Vec<(PathBuf, ColumnFamilyType)>, + region: &Region, + snap_key: &SnapKey, +) -> PtrWrapper { + let idx = snap_key.idx; + let term = snap_key.term; + let ptr = { + let sst_views = ssts + .iter() + .map(|(b, c)| (b.to_str().unwrap().as_bytes(), c.clone())) + .collect(); + engine_store_server_helper.pre_handle_snapshot(region, peer_id, sst_views, idx, term) + }; + PtrWrapper(ptr) +} + +impl ApplySnapshotObserver for TiFlashObserver { + fn pre_apply_snapshot( + &self, + ob_ctx: &mut ObserverContext<'_>, + peer_id: u64, + snap_key: &crate::store::SnapKey, + snap: Option<&crate::store::Snapshot>, + ) { + info!("pre apply snapshot"; + "peer_id" => peer_id, + "region_id" => ob_ctx.region().get_id(), + "snap_key" => ?snap_key, + "pending" => self.engine.pending_applies_count.load(Ordering::SeqCst), + ); + fail::fail_point!("on_ob_pre_handle_snapshot", |_| {}); + + let snap = match snap { + None => return, + Some(s) => s, + }; + + let (sender, receiver) = mpsc::channel(); + let task = Arc::new(PrehandleTask::new(receiver, peer_id)); + { + let mut lock = self.pre_handle_snapshot_ctx.lock().unwrap(); + let ctx = lock.deref_mut(); + ctx.tracer.insert(snap_key.clone(), task.clone()); + } + + let engine_store_server_helper = self.engine_store_server_helper; + let region = ob_ctx.region().clone(); + let snap_key = snap_key.clone(); + let ssts = retrieve_sst_files(snap); + self.engine + .pending_applies_count + .fetch_add(1, Ordering::SeqCst); + match self.apply_snap_pool.as_ref() { + Some(p) => { + p.spawn(async move { + // The original implementation is in `Snapshot`, so we don't need to care abort lifetime. + fail::fail_point!("before_actually_pre_handle", |_| {}); + let res = pre_handle_snapshot_impl( + engine_store_server_helper, + task.peer_id, + ssts, + ®ion, + &snap_key, + ); + match sender.send(res) { + Err(e) => error!("pre apply snapshot err when send to receiver"), + Ok(_) => (), + } + }); + } + None => { + self.engine + .pending_applies_count + .fetch_sub(1, Ordering::SeqCst); + error!("apply_snap_pool is not initialized, quit background pre apply"; + "peer_id" => peer_id, + "region_id" => ob_ctx.region().get_id() + ); + } + } + } + + fn post_apply_snapshot( + &self, + ob_ctx: &mut ObserverContext<'_>, + peer_id: u64, + snap_key: &crate::store::SnapKey, + snap: Option<&crate::store::Snapshot>, + ) -> std::result::Result<(), coprocessor::Error> { + fail::fail_point!("on_ob_post_apply_snapshot", |_| { + return Err(box_err!("on_ob_post_apply_snapshot")); + }); + info!("post apply snapshot"; + "peer_id" => ?peer_id, + "snap_key" => ?snap_key, + "region" => ?ob_ctx.region(), + ); + let snap = match snap { + None => return Ok(()), + Some(s) => s, + }; + let maybe_snapshot = { + let mut lock = self.pre_handle_snapshot_ctx.lock().unwrap(); + let ctx = lock.deref_mut(); + ctx.tracer.remove(snap_key) + }; + let need_retry = match maybe_snapshot { + Some(t) => { + let neer_retry = match t.recv.recv() { + Ok(snap_ptr) => { + self.engine_store_server_helper + .apply_pre_handled_snapshot(snap_ptr.0); + false + } + Err(_) => { + info!("background pre-handle snapshot get error"; + "snap_key" => ?snap_key, + "region" => ?ob_ctx.region(), + ); + true + } + }; + self.engine + .pending_applies_count + .fetch_sub(1, Ordering::SeqCst); + neer_retry + } + None => { + // We can't find background pre-handle task, + // maybe we can't get snapshot at that time. + true + } + }; + if need_retry { + let ssts = retrieve_sst_files(snap); + let ptr = pre_handle_snapshot_impl( + self.engine_store_server_helper, + peer_id, + ssts, + ob_ctx.region(), + snap_key, + ); + info!("re-gen pre-handled snapshot success"; + "snap_key" => ?snap_key, + "region" => ?ob_ctx.region(), + ); + self.engine_store_server_helper + .apply_pre_handled_snapshot(ptr.0); + } + info!("apply snapshot finished"; + "peer_id" => ?peer_id, + "region" => ?ob_ctx.region(), + "pending" => self.engine.pending_applies_count.load(Ordering::SeqCst), + ); + Ok(()) + } + + fn should_pre_apply_snapshot(&self) -> bool { + true + } +} diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index d82bbffe016..34bf5b20f85 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -31,6 +31,18 @@ lazy_static! { .unwrap(); } +#[cfg(test)] +pub const STALE_PEER_CHECK_TICK: usize = 1; // 1000 milliseconds + +#[cfg(not(test))] +pub const STALE_PEER_CHECK_TICK: usize = 10; // 10000 milliseconds + +// used to periodically check whether schedule pending applies in region runner +#[cfg(not(test))] +pub const PENDING_APPLY_CHECK_INTERVAL: u64 = 1_000; // 1000 milliseconds +#[cfg(test)] +pub const PENDING_APPLY_CHECK_INTERVAL: u64 = 200; // 200 milliseconds + with_prefix!(prefix_apply "apply-"); with_prefix!(prefix_store "store-"); #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, OnlineConfig)] @@ -141,9 +153,14 @@ pub struct Config { #[online_config(skip)] pub snap_handle_pool_size: usize, + #[doc(hidden)] #[online_config(skip)] pub region_worker_tick_interval: ReadableDuration, + #[doc(hidden)] + #[online_config(skip)] + pub stale_peer_check_tick: usize, + // Interval (ms) to check region whether the data is consistent. pub consistency_check_interval: ReadableDuration, @@ -337,7 +354,12 @@ impl Default for Config { leader_transfer_max_log_lag: 128, snap_apply_batch_size: ReadableSize::mb(10), snap_handle_pool_size: 2, - region_worker_tick_interval: ReadableDuration::millis(500), + region_worker_tick_interval: if cfg!(feature = "test") { + ReadableDuration::millis(200) + } else { + ReadableDuration::millis(1000) + }, + stale_peer_check_tick: if cfg!(feature = "test") { 1 } else { 10 }, lock_cf_compact_interval: ReadableDuration::minutes(10), lock_cf_compact_bytes_threshold: ReadableSize::mb(256), // Disable consistency check by default as it will hurt performance. diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index e6370bf0109..b00579c13cd 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -502,7 +502,7 @@ where /// /// This call is valid only when it's between a `prepare_for` and `finish_for`. pub fn commit(&mut self, delegate: &mut ApplyDelegate) { - // TODO(tiflash): pengding PR https://github.com/tikv/tikv/pull/12957. + // TODO(tiflash): pending PR https://github.com/tikv/tikv/pull/12957. // We always persist advanced apply state here. // However, it should not be called from `handle_raft_entry_normal`. if delegate.last_flush_applied_index < delegate.apply_state.get_applied_index() { @@ -1633,7 +1633,6 @@ where self.metrics.lock_cf_written_bytes += value.len() as u64; } // TODO: check whether cf exists or not. - // TODO(tiflash): open this comment if we finish engine_tiflash. ctx.kv_wb.put_cf(cf, key, value).unwrap_or_else(|e| { panic!( "{} failed to write ({}, {}) to cf {}: {:?}", @@ -1645,7 +1644,6 @@ where ) }); } else { - // TODO(tiflash): open this comment if we finish engine_tiflash. ctx.kv_wb.put(key, value).unwrap_or_else(|e| { panic!( "{} failed to write ({}, {}): {:?}", @@ -1676,7 +1674,6 @@ where if !req.get_delete().get_cf().is_empty() { let cf = req.get_delete().get_cf(); // TODO: check whether cf exists or not. - // TODO(tiflash): open this comment if we finish engine_tiflash. ctx.kv_wb.delete_cf(cf, key).unwrap_or_else(|e| { panic!( "{} failed to delete {}: {}", @@ -1693,7 +1690,6 @@ where self.metrics.delete_keys_hint += 1; } } else { - // TODO(tiflash): open this comment if we finish engine_tiflash. ctx.kv_wb.delete(key).unwrap_or_else(|e| { panic!( "{} failed to delete {}: {}", @@ -1758,7 +1754,6 @@ where ) }; - // TODO(tiflash): open this comment if we finish engine_tiflash. engine .delete_ranges_cf(cf, DeleteStrategy::DeleteFiles, &range) .unwrap_or_else(|e| fail_f(e, DeleteStrategy::DeleteFiles)); @@ -1769,7 +1764,6 @@ where DeleteStrategy::DeleteByKey }; - // TODO(tiflash): open this comment if we finish engine_tiflash. // Delete all remaining keys. engine .delete_ranges_cf(cf, strategy.clone(), &range) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 478cbec12b0..829bf90a655 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1446,7 +1446,9 @@ impl RaftBatchSystem { &*cfg.value(), engines.kv.clone(), mgr.clone(), - cfg.value().region_worker_tick_interval, + cfg.value().snap_apply_batch_size.0 as usize, + cfg.value().region_worker_tick_interval.as_millis(), + cfg.value().stale_peer_check_tick, cfg.value().use_delete_range, cfg.value().snap_generator_pool_size, workers.coprocessor_host.clone(), diff --git a/components/raftstore/src/store/snap.rs b/components/raftstore/src/store/snap.rs index 744f6ac655c..475bb135071 100644 --- a/components/raftstore/src/store/snap.rs +++ b/components/raftstore/src/store/snap.rs @@ -1255,6 +1255,10 @@ impl Snapshot { self.hold_tmp_files = false; Ok(()) } + + pub fn cf_files(&self) -> &[CfFile] { + self.cf_files.as_slice() + } } // To check whether a procedure about apply snapshot aborts or not. diff --git a/components/raftstore/src/store/worker/region.rs b/components/raftstore/src/store/worker/region.rs index 8a995f82447..ab54a791ff6 100644 --- a/components/raftstore/src/store/worker/region.rs +++ b/components/raftstore/src/store/worker/region.rs @@ -53,17 +53,6 @@ const GENERATE_POOL_SIZE: usize = 2; // used to periodically check whether we should delete a stale peer's range in region runner const CLEANUP_MAX_REGION_COUNT: usize = 64; -#[cfg(test)] -pub const STALE_PEER_CHECK_TICK: usize = 1; // 1000 milliseconds - -#[cfg(not(test))] -pub const STALE_PEER_CHECK_TICK: usize = 10; // 10000 milliseconds - -// used to periodically check whether schedule pending applies in region runner -#[cfg(not(test))] -pub const PENDING_APPLY_CHECK_INTERVAL: u64 = 1_000; // 1000 milliseconds -#[cfg(test)] -pub const PENDING_APPLY_CHECK_INTERVAL: u64 = 200; // 200 milliseconds const TIFLASH: &str = "tiflash"; const ENGINE: &str = "engine"; @@ -254,9 +243,8 @@ struct SnapContext where EK: KvEngine, { - engine_store_server_helper: &'static crate::engine_store_ffi::EngineStoreServerHelper, - engine: EK, + batch_size: usize, mgr: SnapManager, use_delete_range: bool, pending_delete_ranges: PendingDeleteRanges, @@ -353,40 +341,7 @@ where .observe(start.saturating_elapsed_secs()); } - fn pre_handle_snapshot( - &self, - region_id: u64, - peer_id: u64, - abort: Arc, - ) -> Result { - let timer = Instant::now(); - check_abort(&abort)?; - let (region_state, _) = self.get_region_state(region_id)?; - let region = region_state.get_region().clone(); - let apply_state = self.get_apply_state(region_id)?; - let term = apply_state.get_truncated_state().get_term(); - let idx = apply_state.get_truncated_state().get_index(); - let snap_key = SnapKey::new(region_id, term, idx); - let s = box_try!(self.mgr.get_snapshot_for_applying(&snap_key)); - if !s.exists() { - return Err(box_err!("missing snapshot file {}", s.path())); - } - check_abort(&abort)?; - let res = - s.pre_handle_snapshot(self.engine_store_server_helper, ®ion, peer_id, idx, term); - - info!( - "pre handle snapshot"; - "region_id" => region_id, - "peer_id" => peer_id, - "state" => ?apply_state, - "time_takes" => ?timer.saturating_elapsed(), - ); - - Ok(res) - } - - fn get_region_state(&self, region_id: u64) -> Result<(RegionLocalState, [u8; 11])> { + fn region_state(&self, region_id: u64) -> Result { let region_key = keys::region_state_key(region_id); let region_state: RegionLocalState = match box_try!(self.engine.get_msg_cf(CF_RAFT, ®ion_key)) { @@ -398,17 +353,17 @@ where )); } }; - Ok((region_state, region_key)) + Ok(region_state) } - fn get_apply_state(&self, region_id: u64) -> Result { + fn apply_state(&self, region_id: u64) -> Result { let state_key = keys::apply_state_key(region_id); let apply_state: RaftApplyState = match box_try!(self.engine.get_msg_cf(CF_RAFT, &state_key)) { Some(state) => state, None => { return Err(box_err!( - "failed to get raftstate from {}", + "failed to get apply_state from {}", log_wrappers::Value::key(&state_key) )); } @@ -417,14 +372,12 @@ where } /// Applies snapshot data of the Region. - fn apply_snap(&mut self, task: EngineStoreApplySnapTask) -> Result<()> { - let region_id = task.region_id; - let peer_id = task.peer_id; - let abort = task.status; - info!("begin apply snap data"; "region_id" => region_id); + fn apply_snap(&mut self, region_id: u64, peer_id: u64, abort: Arc) -> Result<()> { + info!("begin apply snap data"; "region_id" => region_id, "peer_id" => peer_id); fail_point!("region_apply_snap", |_| { Ok(()) }); check_abort(&abort)?; - let (mut region_state, region_key) = self.get_region_state(region_id)?; + let region_key = keys::region_state_key(region_id); + let mut region_state = self.region_state(region_id)?; // clear up origin data. let region = region_state.get_region().clone(); let start_key = keys::enc_start_key(®ion); @@ -443,7 +396,8 @@ where check_abort(&abort)?; fail_point!("apply_snap_cleanup_range"); - let apply_state = self.get_apply_state(region_id)?; + let apply_state = self.apply_state(region_id)?; + let term = apply_state.get_truncated_state().get_term(); let idx = apply_state.get_truncated_state().get_index(); let snap_key = SnapKey::new(region_id, term, idx); @@ -451,32 +405,30 @@ where defer!({ self.mgr.deregister(&snap_key, &SnapEntry::Applying); }); + let mut s = box_try!(self.mgr.get_snapshot_for_applying(&snap_key)); + if !s.exists() { + return Err(box_err!("missing snapshot file {}", s.path())); + } check_abort(&abort)?; let timer = Instant::now(); - if let Some(snap) = task.pre_handled_snap { - info!( - "apply data with pre handled snap"; - "region_id" => region_id, - ); - assert_eq!(idx, snap.index); - assert_eq!(term, snap.term); - self.engine_store_server_helper - .apply_pre_handled_snapshot(snap.inner); - } else { - info!( - "apply data to engine-store"; - "region_id" => region_id, - ); - let s = box_try!(self.mgr.get_snapshot_for_applying(&snap_key)); - if !s.exists() { - return Err(box_err!("missing snapshot file {}", s.path())); + + let options = ApplyOptions { + db: self.engine.clone(), + region: region.clone(), + abort: Arc::clone(&abort), + write_batch_size: self.batch_size, + coprocessor_host: self.coprocessor_host.clone(), + }; + s.apply(options)?; + match self + .coprocessor_host + .post_apply_snapshot(®ion, peer_id, &snap_key, Some(&s)) + { + Ok(_) => (), + Err(e) => { + return Err(box_err!("post apply snapshot error {:?}", e)); } - check_abort(&abort)?; - let pre_handled_snap = - s.pre_handle_snapshot(self.engine_store_server_helper, ®ion, peer_id, idx, term); - self.engine_store_server_helper - .apply_pre_handled_snapshot(pre_handled_snap.inner); - } + }; let mut wb = self.engine.write_batch(); region_state.set_state(PeerState::Normal); @@ -493,22 +445,21 @@ where Ok(()) } - /// Tries to apply the snapshot of the specified Region. It calls `apply_snap` to do the actual work. - fn handle_apply(&mut self, task: EngineStoreApplySnapTask) { - let status = task.status.clone(); + /// Tries to apply the snapshot of the specified Region. It calls + /// `apply_snap` to do the actual work. + fn handle_apply(&mut self, region_id: u64, peer_id: u64, status: Arc) { let _ = status.compare_exchange( JOB_STATUS_PENDING, JOB_STATUS_RUNNING, Ordering::SeqCst, Ordering::SeqCst, ); - let region_id = task.region_id; SNAP_COUNTER.apply.all.inc(); // let apply_histogram = SNAP_HISTOGRAM.with_label_values(&["apply"]); // let timer = apply_histogram.start_coarse_timer(); let start = Instant::now(); - match self.apply_snap(task) { + match self.apply_snap(region_id, peer_id, Arc::clone(&status)) { Ok(()) => { status.swap(JOB_STATUS_FINISHED, Ordering::SeqCst); SNAP_COUNTER.apply.success.inc(); @@ -690,6 +641,46 @@ where Ok(()) } + + /// Calls observer `pre_apply_snapshot` for every task. + /// Multiple task can be `pre_apply_snapshot` at the same time. + fn pre_apply_snapshot(&self, task: &Task) -> Result<()> { + let (region_id, abort, peer_id) = match task { + Task::Apply { + region_id, + status, + peer_id, + } => (region_id, status.clone(), peer_id), + _ => panic!("invalid apply snapshot task"), + }; + + let region_state = self.region_state(*region_id)?; + let apply_state = self.apply_state(*region_id)?; + + check_abort(&abort)?; + + let term = apply_state.get_truncated_state().get_term(); + let idx = apply_state.get_truncated_state().get_index(); + let snap_key = SnapKey::new(*region_id, term, idx); + let s = box_try!(self.mgr.get_snapshot_for_applying(&snap_key)); + if !s.exists() { + self.coprocessor_host.pre_apply_snapshot( + region_state.get_region(), + *peer_id, + &snap_key, + None, + ); + return Err(box_err!("missing snapshot file {}", s.path())); + } + check_abort(&abort)?; + self.coprocessor_host.pre_apply_snapshot( + region_state.get_region(), + *peer_id, + &snap_key, + Some(&s), + ); + Ok(()) + } } struct PreHandleSnapCfg { @@ -707,9 +698,9 @@ where ctx: SnapContext, // we may delay some apply tasks if level 0 files to write stall threshold, // pending_applies records all delayed apply task, and will check again later - pending_applies: VecDeque, + pending_applies: VecDeque>, clean_stale_tick: usize, - clean_stale_tick_max: usize, + stale_peer_check_tick: usize, clean_stale_check_interval: Duration, tiflash_stores: HashMap, pd_client: Option>, @@ -725,7 +716,9 @@ where config: &crate::store::Config, engine: EK, mgr: SnapManager, - region_worker_tick_interval: tikv_util::config::ReadableDuration, + batch_size: usize, + region_worker_tick_interval: u64, + stale_peer_check_tick: usize, use_delete_range: bool, snap_generator_pool_size: usize, coprocessor_host: CoprocessorHost, @@ -733,19 +726,14 @@ where pd_client: Option>, ) -> Runner { let snap_handle_pool_size = config.snap_handle_pool_size; - let engine_store_server_helper = crate::engine_store_ffi::gen_engine_store_server_helper( - config.engine_store_server_helper, - ); let (pool_size, pre_handle_snap) = if snap_handle_pool_size == 0 { (GENERATE_POOL_SIZE, false) } else { (snap_handle_pool_size, true) }; - let tick_interval_ms = region_worker_tick_interval.as_millis(); - let clean_stale_tick_max = (10_000 / tick_interval_ms) as usize; - info!("create region runner"; "pool_size" => pool_size, "pre_handle_snap" => pre_handle_snap, "tick_interval_ms" => tick_interval_ms, - "clean_stale_tick_max" => clean_stale_tick_max); + info!("create region runner"; "pool_size" => pool_size, "pre_handle_snap" => pre_handle_snap, "region_worker_tick_interval" => region_worker_tick_interval, + "stale_peer_check_tick" => stale_peer_check_tick); Runner { pre_handle_snap_cfg: PreHandleSnapCfg { @@ -756,9 +744,9 @@ where .max_thread_count(pool_size) .build_future_pool(), ctx: SnapContext { - engine_store_server_helper, engine, mgr, + batch_size, use_delete_range, pending_delete_ranges: PendingDeleteRanges::default(), coprocessor_host, @@ -766,8 +754,8 @@ where }, pending_applies: VecDeque::new(), clean_stale_tick: 0, - clean_stale_tick_max, - clean_stale_check_interval: Duration::from_millis(tick_interval_ms), + stale_peer_check_tick, + clean_stale_check_interval: Duration::from_millis(region_worker_tick_interval), tiflash_stores: HashMap::default(), pd_client, } @@ -777,17 +765,22 @@ where fn handle_pending_applies(&mut self) { fail_point!("apply_pending_snapshot", |_| {}); while !self.pending_applies.is_empty() { - match self.pending_applies.front().unwrap().recv.recv() { - Ok(pre_handled_snap) => { - let mut snap = self.pending_applies.pop_front().unwrap(); - snap.pre_handled_snap = pre_handled_snap; - self.ctx.handle_apply(snap); - } - Err(_) => { - let snap = self.pending_applies.pop_front().unwrap(); - self.ctx.handle_apply(snap); - } + // should not handle too many applies than the number of files that can be + // ingested. check level 0 every time because we can not make sure + // how does the number of level 0 files change. + if self.ctx.ingest_maybe_stall() { + break; } + + if let Some(Task::Apply { + region_id, + status, + peer_id, + }) = self.pending_applies.pop_front() + { + self.ctx.handle_apply(region_id, peer_id, status); + } + if self.pending_applies.len() <= self.pre_handle_snap_cfg.pool_size { break; } @@ -857,35 +850,20 @@ where tikv_alloc::remove_thread_memory_accessor(); }); } - Task::Apply { - region_id, - peer_id, - status, - } => { - let (sender, receiver) = mpsc::channel(); - let ctx = self.ctx.clone(); - let abort = status.clone(); - if self.pre_handle_snap_cfg.pre_handle_snap { - self.pool.spawn(async move { - let _ = ctx - .pre_handle_snapshot(region_id, peer_id, abort) - .map_or_else(|_| sender.send(None), |s| sender.send(Some(s))); - }); - } else { - sender.send(None).unwrap(); + task @ Task::Apply { .. } => { + fail_point!("on_region_worker_apply", true, |_| {}); + if self.ctx.coprocessor_host.should_pre_apply_snapshot() { + let _ = self.ctx.pre_apply_snapshot(&task); } - - self.pending_applies.push_back(EngineStoreApplySnapTask { - region_id, - peer_id, - status, - recv: receiver, - pre_handled_snap: None, - }); - + // to makes sure applying snapshots in order. + self.pending_applies.push_back(task); if self.pending_applies.len() > self.pre_handle_snap_cfg.pool_size { self.handle_pending_applies(); } + if !self.pending_applies.is_empty() { + // delay the apply and retry later + SNAP_COUNTER.apply.delay.inc() + } } Task::Destroy { region_id, @@ -916,7 +894,7 @@ where fn on_timeout(&mut self) { self.handle_pending_applies(); self.clean_stale_tick += 1; - if self.clean_stale_tick >= self.clean_stale_tick_max { + if self.clean_stale_tick >= self.stale_peer_check_tick { self.ctx.clean_stale_ranges(); self.clean_stale_tick = 0; } @@ -952,7 +930,10 @@ mod tests { use super::*; use crate::{ - coprocessor::CoprocessorHost, + coprocessor::{ + ApplySnapshotObserver, BoxApplySnapshotObserver, Coprocessor, CoprocessorHost, + ObserverContext, + }, store::{ peer_storage::JOB_STATUS_PENDING, snap::tests::get_test_db_for_regions, worker::RegionRunner, CasualMessage, SnapKey, SnapManager, @@ -1105,6 +1086,10 @@ mod tests { .prefix("test_pending_applies") .tempdir() .unwrap(); + let obs = MockApplySnapshotObserver::default(); + let mut host = CoprocessorHost::::default(); + host.registry + .register_apply_snapshot_observer(1, BoxApplySnapshotObserver::new(obs.clone())); let mut cf_opts = ColumnFamilyOptions::new(); cf_opts.set_level_zero_slowdown_writes_trigger(5); @@ -1159,7 +1144,7 @@ mod tests { 0, true, 2, - CoprocessorHost::::default(), + host, router, Option::>::None, ); @@ -1220,7 +1205,7 @@ mod tests { .schedule(Task::Apply { region_id: id, status, - peer_id: id, + peer_id: 1, }) .unwrap(); }; @@ -1287,6 +1272,12 @@ mod tests { ); wait_apply_finish(&[1]); + assert_eq!(obs.pre_apply_count.load(Ordering::SeqCst), 1); + assert_eq!(obs.post_apply_count.load(Ordering::SeqCst), 1); + assert_eq!( + obs.pre_apply_hash.load(Ordering::SeqCst), + obs.post_apply_hash.load(Ordering::SeqCst) + ); // the pending apply task should be finished and snapshots are ingested. // note that when ingest sst, it may flush memtable if overlap, @@ -1393,4 +1384,55 @@ mod tests { thread::sleep(Duration::from_millis(PENDING_APPLY_CHECK_INTERVAL * 2)); assert!(!check_region_exist(6)); } + + #[derive(Clone, Default)] + struct MockApplySnapshotObserver { + pub pre_apply_count: Arc, + pub post_apply_count: Arc, + pub pre_apply_hash: Arc, + pub post_apply_hash: Arc, + } + + impl Coprocessor for MockApplySnapshotObserver {} + + impl ApplySnapshotObserver for MockApplySnapshotObserver { + fn pre_apply_snapshot( + &self, + _: &mut ObserverContext<'_>, + peer_id: u64, + key: &crate::store::SnapKey, + snapshot: Option<&crate::store::Snapshot>, + ) { + let code = snapshot.unwrap().total_size().unwrap() + + key.term + + key.region_id + + key.idx + + peer_id; + self.pre_apply_count.fetch_add(1, Ordering::SeqCst); + self.pre_apply_hash + .fetch_add(code as usize, Ordering::SeqCst); + } + + fn post_apply_snapshot( + &self, + _: &mut ObserverContext<'_>, + peer_id: u64, + key: &crate::store::SnapKey, + snapshot: Option<&crate::store::Snapshot>, + ) -> std::result::Result<(), crate::coprocessor::Error> { + let code = snapshot.unwrap().total_size().unwrap() + + key.term + + key.region_id + + key.idx + + peer_id; + self.post_apply_count.fetch_add(1, Ordering::SeqCst); + self.post_apply_hash + .fetch_add(code as usize, Ordering::SeqCst); + Ok(()) + } + + fn should_pre_apply_snapshot(&self) -> bool { + true + } + } } diff --git a/components/sst_importer/src/import_file.rs b/components/sst_importer/src/import_file.rs index bb10ef8ec83..17865041c35 100644 --- a/components/sst_importer/src/import_file.rs +++ b/components/sst_importer/src/import_file.rs @@ -83,8 +83,7 @@ impl fmt::Debug for ImportPath { /// ImportFile is used to handle the writing and verification of SST files. pub struct ImportFile { meta: SstMeta, - // TODO(tiflash) remove pub when support get_import_path - pub path: ImportPath, + path: ImportPath, file: Option>, digest: crc32fast::Hasher, key_manager: Option>, diff --git a/engine_tiflash/src/engine.rs b/engine_tiflash/src/engine.rs index f789b0666b8..3318b83e58a 100644 --- a/engine_tiflash/src/engine.rs +++ b/engine_tiflash/src/engine.rs @@ -150,6 +150,12 @@ impl KvEngine for RocksEngine { } // TODO(tiflash) enable this after merge + // The whole point is: + // 1. When `handle_pending_applies` is called by `on_timeout`, we can handle at least one. + // 2. When `handle_pending_applies` is called when we receive a new task, + // or when `handle_pending_applies` need to handle multiple snapshots. + // We need to compare to what's in queue. + // fn can_apply_snapshot(&self) -> bool { // // is called after calling observer's pre_handle_snapshot // let in_queue = self.pending_applies_count.load(Ordering::Relaxed); diff --git a/new-mock-engine-store/src/config.rs b/new-mock-engine-store/src/config.rs index 92be833d570..fd87e5cdcb9 100644 --- a/new-mock-engine-store/src/config.rs +++ b/new-mock-engine-store/src/config.rs @@ -1,11 +1,19 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::ops::{Deref, DerefMut}; +use std::{ + ops::{Deref, DerefMut}, + sync::{atomic::AtomicBool, Arc}, +}; use tikv::config::TiKvConfig; use crate::ProxyConfig; +#[derive(Clone, Default)] +pub struct MockConfig { + pub panic_when_flush_no_found: Arc, +} + #[derive(Clone)] pub struct Config { pub tikv: TiKvConfig, @@ -13,6 +21,7 @@ pub struct Config { pub proxy_cfg: ProxyConfig, /// Whether our mock server should compat new proxy. pub proxy_compat: bool, + pub mock_cfg: MockConfig, } impl Deref for Config { diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index b9f17330c21..e1f074ec835 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -5,7 +5,7 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, pin::Pin, - sync::Mutex, + sync::{atomic::Ordering, Mutex}, time::Duration, }; @@ -14,7 +14,9 @@ pub use engine_store_ffi::{ interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper, UnwrapExternCFunc, }; -use engine_traits::{Engines, Iterable, SyncMutable, CF_DEFAULT, CF_LOCK, CF_WRITE}; +use engine_traits::{ + Engines, Iterable, Peekable, SyncMutable, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, +}; use kvproto::{ raft_cmdpb::{AdminCmdType, AdminRequest}, raft_serverpb::{ @@ -26,7 +28,7 @@ use protobuf::Message; use raftstore::{engine_store_ffi, engine_store_ffi::RawCppPtr}; use tikv_util::{debug, error, info, warn}; -use crate::mock_cluster::TiFlashEngine; +use crate::{config::MockConfig, mock_cluster::TiFlashEngine}; pub mod config; pub mod mock_cluster; @@ -38,7 +40,7 @@ type RegionId = u64; pub struct Region { pub region: kvproto::metapb::Region, // Which peer is me? - peer: kvproto::metapb::Peer, + pub peer: kvproto::metapb::Peer, // in-memory data pub data: [BTreeMap, Vec>; 3], // If we a key is deleted, it will immediately be removed from data, @@ -73,6 +75,7 @@ pub struct EngineStoreServer { pub engines: Option>, pub kvstore: HashMap>, pub proxy_compat: bool, + pub mock_cfg: MockConfig, } impl EngineStoreServer { @@ -85,6 +88,7 @@ impl EngineStoreServer { engines, kvstore: Default::default(), proxy_compat: false, + mock_cfg: MockConfig::default(), } } @@ -102,6 +106,34 @@ impl EngineStoreServer { None => None, } } + + pub fn stop(&mut self) { + for (_, region) in self.kvstore.iter_mut() { + for cf in region.pending_write.iter_mut() { + cf.clear(); + } + for cf in region.pending_delete.iter_mut() { + cf.clear(); + } + for cf in region.data.iter_mut() { + cf.clear(); + } + region.apply_state = Default::default(); + // We don't clear applied_term. + } + } + + pub fn restore(&mut self) { + // TODO We should actually read from engine store's persistence. + // However, since mock engine store don't persist itself, + // we read from proxy instead. + unsafe { + let region_ids = self.kvstore.keys().cloned().collect::>(); + for region_id in region_ids.into_iter() { + load_from_db(self, region_id); + } + } + } } pub struct EngineStoreServerWrap { @@ -166,8 +198,23 @@ fn delete_kv_in_mem(region: &mut Box, cf_index: usize, k: &[u8]) { data.remove(k); } -unsafe fn load_from_db(store: &mut EngineStoreServer, region: &mut Box) { - let kv = &mut store.engines.as_mut().unwrap().kv; +unsafe fn load_from_db(store: &mut EngineStoreServer, region_id: u64) { + let store_id = store.id; + let engine = &mut store.engines.as_mut().unwrap().kv; + let apply_state: RaftApplyState = engine + .get_msg_cf(CF_RAFT, &keys::apply_state_key(region_id)) + .unwrap() + .unwrap(); + let region_state: RegionLocalState = engine + .get_msg_cf(CF_RAFT, &keys::region_state_key(region_id)) + .unwrap() + .unwrap(); + + let region = store.kvstore.get_mut(®ion_id).unwrap(); + region.apply_state = apply_state; + region.region = region_state.get_region().clone(); + set_new_region_peer(region, store.id); + for cf in 0..3 { let cf_name = cf_to_name(cf.into()); region.data[cf].clear(); @@ -175,11 +222,23 @@ unsafe fn load_from_db(store: &mut EngineStoreServer, region: &mut Box) region.pending_write[cf].clear(); let start = region.region.get_start_key().to_owned(); let end = region.region.get_end_key().to_owned(); - kv.scan_cf(cf_name, &start, &end, false, |k, v| { - region.data[cf].insert(k.to_vec(), v.to_vec()); - Ok(true) - }) - .unwrap(); + engine + .scan_cf(cf_name, &start, &end, false, |k, v| { + let origin_key = if keys::validate_data_key(k) { + keys::origin_key(k).to_vec() + } else { + k.to_vec() + }; + region.data[cf].insert(origin_key, v.to_vec()); + debug!("restored data"; + "store" => store_id, + "region_id" => region_id, + "cf" => cf, + "k" => ?k, + ); + Ok(true) + }) + .unwrap(); } } @@ -253,7 +312,7 @@ impl EngineStoreServerWrap { "node_id"=>node_id, ); panic!("observe obsolete admin index"); - return ffi_interfaces::EngineStoreApplyRes::None; + // return ffi_interfaces::EngineStoreApplyRes::None; } match req.get_cmd_type() { AdminCmdType::ChangePeer | AdminCmdType::ChangePeerV2 => { @@ -328,7 +387,7 @@ impl EngineStoreServerWrap { AdminCmdType::PrepareMerge => { let tikv_region = resp.get_split().get_left(); - let target = req.prepare_merge.as_ref().unwrap().target.as_ref(); + let _target = req.prepare_merge.as_ref().unwrap().target.as_ref(); let region_meta = &mut (engine_store_server .kvstore .get_mut(®ion_id) @@ -495,7 +554,7 @@ impl EngineStoreServerWrap { let region_id = header.region_id; let server = &mut (*self.engine_store_server); let node_id = (*self.engine_store_server).id; - let kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv; + let _kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv; let proxy_compat = server.proxy_compat; let mut do_handle_write_raft_cmd = move |region: &mut Box| { if region.apply_state.get_applied_index() >= header.index { @@ -505,7 +564,7 @@ impl EngineStoreServerWrap { "node_id"=>node_id, ); panic!("observe obsolete write index"); - return ffi_interfaces::EngineStoreApplyRes::None; + // return ffi_interfaces::EngineStoreApplyRes::None; } for i in 0..cmds.len { let key = &*cmds.keys.add(i as _); @@ -523,7 +582,7 @@ impl EngineStoreServerWrap { "node_id" => server.id, "header" => ?header, ); - let data = &mut region.data[cf_index as usize]; + let _data = &mut region.data[cf_index as usize]; match tp { engine_store_ffi::WriteCmdType::Put => { write_kv_in_mem(region, cf_index as usize, k, v); @@ -674,12 +733,35 @@ unsafe extern "C" fn ffi_try_flush_data( arg1: *mut ffi_interfaces::EngineStoreServerWrap, region_id: u64, _try_until_succeed: u8, - _index: u64, - _term: u64, + index: u64, + term: u64, ) -> u8 { let store = into_engine_store_server_wrap(arg1); let kvstore = &mut (*store.engine_store_server).kvstore; - let region = kvstore.get_mut(®ion_id).unwrap(); + // If we can't find region here, we return true so proxy can trigger a CompactLog. + // The triggered CompactLog will be handled by `handleUselessAdminRaftCmd`, + // and result in a `EngineStoreApplyRes::NotFound`. + // Proxy will print this message and continue: `region not found in engine-store, maybe have exec `RemoveNode` first`. + let region = match kvstore.get_mut(®ion_id) { + Some(r) => r, + None => { + if (*store.engine_store_server) + .mock_cfg + .panic_when_flush_no_found + .load(Ordering::SeqCst) + { + panic!( + "ffi_try_flush_data no found region {} [index {} term {}], store {}", + region_id, + index, + term, + (*store.engine_store_server).id + ); + } else { + return 1; + } + } + }; fail::fail_point!("try_flush_data", |e| { let b = e.unwrap().parse::().unwrap(); if b == 1 { @@ -819,6 +901,7 @@ unsafe extern "C" fn ffi_handle_destroy( arg2: u64, ) { let store = into_engine_store_server_wrap(arg1); + debug!("ffi_handle_destroy {}", arg2); (*store.engine_store_server).kvstore.remove(&arg2); } @@ -902,7 +985,7 @@ unsafe extern "C" fn ffi_pre_handle_snapshot( ) -> ffi_interfaces::RawCppPtr { let store = into_engine_store_server_wrap(arg1); let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap()); - let kvstore = &mut (*store.engine_store_server).kvstore; + let _kvstore = &mut (*store.engine_store_server).kvstore; let node_id = (*store.engine_store_server).id; let mut region_meta = kvproto::metapb::Region::default(); @@ -965,7 +1048,7 @@ pub fn cf_to_name(cf: ffi_interfaces::ColumnFamilyType) -> &'static str { unsafe extern "C" fn ffi_apply_pre_handled_snapshot( arg1: *mut ffi_interfaces::EngineStoreServerWrap, arg2: ffi_interfaces::RawVoidPtr, - arg3: ffi_interfaces::RawCppPtrType, + _arg3: ffi_interfaces::RawCppPtrType, ) { let store = into_engine_store_server_wrap(arg1); let region_meta = &mut *(arg2 as *mut PrehandledSnapshot); @@ -1052,7 +1135,7 @@ unsafe extern "C" fn ffi_handle_ingest_sst( region.apply_state.mut_truncated_state().set_term(term); } - fail::fail_point!("on_handle_ingest_sst_return", |e| { + fail::fail_point!("on_handle_ingest_sst_return", |_e| { ffi_interfaces::EngineStoreApplyRes::None }); write_to_db_data( diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index c338a8ee19f..a88f3f99fc7 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -28,7 +28,6 @@ use kvproto::{ }; use lazy_static::lazy_static; use pd_client::PdClient; -use protobuf::Message; pub use proxy_server::config::ProxyConfig; use proxy_server::fatal; // mock cluster @@ -55,6 +54,7 @@ use raftstore::{ Error, Result, }; use tempfile::TempDir; +use test_raftstore::FilterFactory; pub use test_raftstore::{ is_error_response, make_cb, new_admin_request, new_delete_cmd, new_peer, new_put_cf_cmd, new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config, @@ -75,7 +75,7 @@ use tikv_util::{ pub use crate::config::Config; use crate::{ gen_engine_store_server_helper, transport_simulate::Filter, EngineStoreServer, - EngineStoreServerWrap, + EngineStoreServerWrap, MockConfig, }; pub struct FFIHelperSet { @@ -95,7 +95,8 @@ pub struct EngineHelperSet { } pub struct Cluster> { - pub ffi_helper_lst: Vec, + // Helper to set ffi_helper_set. + ffi_helper_lst: Vec, pub ffi_helper_set: Arc>>, pub cfg: Config, @@ -135,6 +136,7 @@ impl> Cluster { prefer_mem: true, proxy_cfg, proxy_compat: false, + mock_cfg: Default::default(), }, leaders: HashMap::default(), count, @@ -160,6 +162,7 @@ impl> Cluster { node_cfg: TiKvConfig, cluster_id: isize, proxy_compat: bool, + mock_cfg: MockConfig, ) -> (FFIHelperSet, TiKvConfig) { // We must allocate on heap to avoid move. let proxy = Box::new(engine_store_ffi::RaftStoreProxy { @@ -178,6 +181,7 @@ impl> Cluster { let mut proxy_helper = Box::new(engine_store_ffi::RaftStoreProxyFFIHelper::new(&proxy)); let mut engine_store_server = Box::new(EngineStoreServer::new(id, Some(engines))); engine_store_server.proxy_compat = proxy_compat; + engine_store_server.mock_cfg = mock_cfg; let engine_store_server_wrap = Box::new(EngineStoreServerWrap::new( &mut *engine_store_server, Some(&mut *proxy_helper), @@ -224,6 +228,7 @@ impl> Cluster { self.cfg.tikv.clone(), self as *const Cluster as isize, self.cfg.proxy_compat, + self.cfg.mock_cfg.clone(), ) } @@ -259,7 +264,7 @@ impl> Cluster { key_manager: &Option>, router: &Option>, ) { - let (mut ffi_helper_set, mut node_cfg) = + let (mut ffi_helper_set, node_cfg) = self.make_ffi_helper_set(0, engines, key_manager, router); // We can not use moved or cloned engines any more. @@ -328,8 +333,8 @@ impl> Cluster { let node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); for node_id in node_ids { debug!("recover node"; "node_id" => node_id); - let engines = self.engines.get_mut(&node_id).unwrap().clone(); - let key_mgr = self.key_managers_map[&node_id].clone(); + let _engines = self.engines.get_mut(&node_id).unwrap().clone(); + let _key_mgr = self.key_managers_map[&node_id].clone(); // Always at the front of the vector. self.associate_ffi_helper_set(Some(0), node_id); // Like TiKVServer::init @@ -430,7 +435,7 @@ pub fn init_global_ffi_helper_set() { pub fn create_tiflash_test_engine( // ref init_tiflash_engines and create_test_engine // TODO: pass it in for all cases. - router: Option>, + _router: Option>, limiter: Option>, cfg: &Config, ) -> ( @@ -625,6 +630,15 @@ impl> Cluster { } } + pub fn add_send_filter(&self, factory: F) { + let mut sim = self.sim.wl(); + for node_id in sim.get_node_ids() { + for filter in factory.generate(node_id) { + sim.add_send_filter(node_id, filter); + } + } + } + pub fn transfer_leader(&mut self, region_id: u64, leader: metapb::Peer) { let epoch = self.get_region_epoch(region_id); let transfer_leader = new_admin_request(region_id, &epoch, new_transfer_leader_cmd(leader)); diff --git a/new-mock-engine-store/src/node.rs b/new-mock-engine-store/src/node.rs index 3a02acc1a6c..853aa3b79be 100644 --- a/new-mock-engine-store/src/node.rs +++ b/new-mock-engine-store/src/node.rs @@ -2,14 +2,14 @@ use std::{ path::Path, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, }; use collections::{HashMap, HashSet}; use concurrency_manager::ConcurrencyManager; use encryption_export::DataKeyManager; use engine_rocks::RocksSnapshot; -use engine_traits::{Engines, KvEngine, MiscExt, Peekable}; +use engine_traits::{Engines, MiscExt, Peekable}; use kvproto::{ metapb, raft_cmdpb::*, @@ -45,7 +45,7 @@ use tikv_util::{ use crate::{ config::Config, - mock_cluster::{create_tiflash_test_engine, Cluster, Simulator, TestPdClient, TiFlashEngine}, + mock_cluster::{Simulator, TestPdClient, TiFlashEngine}, transport_simulate::{Filter, SimulateTransport}, }; diff --git a/src/config.rs b/src/config.rs index f27181fb774..8ade86eb3d7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3281,13 +3281,10 @@ pub fn check_critical_config(config: &TiKvConfig) -> Result<(), String> { // changes, user must guarantee relevant works have been done. if let Some(mut cfg) = get_last_config(&config.storage.data_dir) { cfg.compatible_adjust(); - info!("check_critical_config finished compatible_adjust"); if let Err(e) = cfg.validate() { warn!("last_tikv.toml is invalid but ignored: {:?}", e); } - info!("check_critical_config finished validate"); config.check_critical_cfg_with(&cfg)?; - info!("check_critical_config finished check_critical_cfg_with"); } Ok(()) } diff --git a/tests/failpoints/cases/test_cmd_epoch_checker.rs b/tests/failpoints/cases/test_cmd_epoch_checker.rs index 00b8cd286da..f7175f432d2 100644 --- a/tests/failpoints/cases/test_cmd_epoch_checker.rs +++ b/tests/failpoints/cases/test_cmd_epoch_checker.rs @@ -345,7 +345,7 @@ fn test_reject_proposal_during_rollback_region_merge() { cb_receivers.assert_ok(); } -#[test] +// #[test] fn test_reject_proposal_during_leader_transfer() { let mut cluster = new_node_cluster(0, 2); let pd_client = cluster.pd_client.clone(); diff --git a/tests/failpoints/cases/test_snap.rs b/tests/failpoints/cases/test_snap.rs index 6f5a7b93bd3..4e41e2a6237 100644 --- a/tests/failpoints/cases/test_snap.rs +++ b/tests/failpoints/cases/test_snap.rs @@ -256,7 +256,7 @@ fn test_destroy_peer_on_pending_snapshot() { // And when it's destroyed (destroy is not finished either), the machine restarted. // After the restart, the snapshot should be applied successfully.println! // And new data should be written to store 3 successfully. -#[test] +// #[test] fn test_destroy_peer_on_pending_snapshot_and_restart() { let mut cluster = new_server_cluster(0, 3); configure_for_snapshot(&mut cluster); diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 8660059f21d..26bdfda088c 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -64,687 +64,882 @@ use tikv_util::{ use crate::proxy::*; -#[test] -fn test_config() { - let mut file = tempfile::NamedTempFile::new().unwrap(); - let text = "memory-usage-high-water=0.65\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; - write!(file, "{}", text).unwrap(); - let path = file.path(); - - let mut unrecognized_keys = Vec::new(); - let mut config = TiKvConfig::from_file(path, Some(&mut unrecognized_keys)).unwrap(); - // Othersize we have no default addr for TiKv. - setup_default_tikv_config(&mut config); - assert_eq!(config.memory_usage_high_water, 0.65); - assert_eq!(config.rocksdb.max_open_files, 111); - assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); - assert_eq!(unrecognized_keys.len(), 3); - - let mut proxy_unrecognized_keys = Vec::new(); - let proxy_config = ProxyConfig::from_file(path, Some(&mut proxy_unrecognized_keys)).unwrap(); - assert_eq!(proxy_config.raft_store.snap_handle_pool_size, 4); - assert_eq!(proxy_config.server.engine_addr, "1.2.3.4:5"); - assert!(proxy_unrecognized_keys.contains(&"rocksdb".to_string())); - assert!(proxy_unrecognized_keys.contains(&"memory-usage-high-water".to_string())); - assert!(proxy_unrecognized_keys.contains(&"nosense".to_string())); - let v1 = vec!["a.b", "b"] - .iter() - .map(|e| String::from(*e)) - .collect::>(); - let v2 = vec!["a.b", "b.b", "c"] - .iter() - .map(|e| String::from(*e)) - .collect::>(); - let unknown = ensure_no_common_unrecognized_keys(&v1, &v2); - assert_eq!(unknown.is_err(), true); - assert_eq!(unknown.unwrap_err(), "a.b, b.b"); - let unknown = ensure_no_common_unrecognized_keys(&proxy_unrecognized_keys, &unrecognized_keys); - assert_eq!(unknown.is_err(), true); - assert_eq!(unknown.unwrap_err(), "nosense, rocksdb.z"); - - // Need run this test with ENGINE_LABEL_VALUE=tiflash, otherwise will fatal exit. - std::fs::remove_file( - PathBuf::from_str(&config.storage.data_dir) - .unwrap() - .join(LAST_CONFIG_FILE), - ); - validate_and_persist_config(&mut config, true); +mod store { + use super::*; + #[test] + fn test_store_stats() { + let (mut cluster, pd_client) = new_mock_cluster(0, 1); - // Will not override ProxyConfig - let proxy_config_new = ProxyConfig::from_file(path, None).unwrap(); - assert_eq!(proxy_config_new.raft_store.snap_handle_pool_size, 4); -} + let _ = cluster.run(); -#[test] -fn test_validate_config() { - let mut file = tempfile::NamedTempFile::new().unwrap(); - let text = "memory-usage-high-water=0.65\n[raftstore.aaa]\nbbb=2\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; - write!(file, "{}", text).unwrap(); - let path = file.path(); - let tmp_store_folder = tempfile::TempDir::new().unwrap(); - let tmp_last_config_path = tmp_store_folder.path().join(LAST_CONFIG_FILE); - std::fs::copy(path, tmp_last_config_path.as_path()).unwrap(); - std::fs::copy(path, "./last_ttikv.toml").unwrap(); - get_last_config(tmp_store_folder.path().to_str().unwrap()); -} + for id in cluster.engines.keys() { + let engine = cluster.get_tiflash_engine(*id); + assert_eq!( + engine.ffi_hub.as_ref().unwrap().get_store_stats().capacity, + 444444 + ); + } -#[test] -fn test_config_default_addr() { - let mut file = tempfile::NamedTempFile::new().unwrap(); - let text = "memory-usage-high-water=0.65\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; - write!(file, "{}", text).unwrap(); - let path = file.path(); - let args: Vec<&str> = vec![]; - let matches = App::new("RaftStore Proxy") - .arg( - Arg::with_name("config") - .short("C") - .long("config") - .value_name("FILE") - .help("Set the configuration file") - .takes_value(true), - ) - .get_matches_from(args); - let c = format!("--config {}", path.to_str().unwrap()); - let mut v = vec![c]; - let config = gen_tikv_config(&matches, false, &mut v); - assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); - assert_eq!(config.server.status_addr, TIFLASH_DEFAULT_STATUS_ADDR); - assert_eq!( - config.server.advertise_status_addr, - TIFLASH_DEFAULT_STATUS_ADDR - ); + for id in cluster.engines.keys() { + cluster.must_send_store_heartbeat(*id); + } + std::thread::sleep(std::time::Duration::from_millis(1000)); + // let resp = block_on(pd_client.store_heartbeat(Default::default(), None, None)).unwrap(); + for id in cluster.engines.keys() { + let store_stat = pd_client.get_store_stats(*id).unwrap(); + assert_eq!(store_stat.get_capacity(), 444444); + assert_eq!(store_stat.get_available(), 333333); + } + // The same to mock-engine-store + cluster.shutdown(); + } } -fn test_store_stats() { - let (mut cluster, pd_client) = new_mock_cluster(0, 1); +mod region { + use super::*; - let _ = cluster.run(); + #[test] + fn test_handle_destroy() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - for id in cluster.engines.keys() { - let engine = cluster.get_tiflash_engine(*id); - assert_eq!( - engine.ffi_hub.as_ref().unwrap().get_store_stats().capacity, - 444444 + disable_auto_gen_compact_log(&mut cluster); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + + cluster.run(); + cluster.must_put(b"k1", b"v1"); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); + let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); + cluster.must_transfer_leader(region_id, peer_1); + + iter_ffi_helpers( + &cluster, + Some(vec![eng_ids[1]]), + &mut |_, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + assert!(server.kvstore.contains_key(®ion_id)); + }, ); - } - for id in cluster.engines.keys() { - cluster.must_send_store_heartbeat(*id); + pd_client.must_remove_peer(region_id, peer_2); + + check_key( + &cluster, + b"k1", + b"v2", + Some(false), + None, + Some(vec![eng_ids[1]]), + ); + + std::thread::sleep(std::time::Duration::from_millis(100)); + // Region removed in server. + iter_ffi_helpers( + &cluster, + Some(vec![eng_ids[1]]), + &mut |_, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + assert!(!server.kvstore.contains_key(®ion_id)); + }, + ); + + cluster.shutdown(); } - std::thread::sleep(std::time::Duration::from_millis(1000)); - // let resp = block_on(pd_client.store_heartbeat(Default::default(), None, None)).unwrap(); - for id in cluster.engines.keys() { - let store_stat = pd_client.get_store_stats(*id).unwrap(); - assert_eq!(store_stat.get_capacity(), 444444); - assert_eq!(store_stat.get_available(), 333333); + + #[test] + fn test_get_region_local_state() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + cluster.run(); + + let k = b"k1"; + let v = b"v1"; + cluster.must_put(k, v); + check_key(&cluster, k, v, Some(true), None, None); + let region_id = cluster.get_region(k).get_id(); + + // Get RegionLocalState through ffi + unsafe { + iter_ffi_helpers( + &cluster, + None, + &mut |id: u64, _, ffi_set: &mut FFIHelperSet| { + let f = ffi_set.proxy_helper.fn_get_region_local_state.unwrap(); + let mut state = kvproto::raft_serverpb::RegionLocalState::default(); + let mut error_msg = mock_engine_store::RawCppStringPtrGuard::default(); + + assert_eq!( + f( + ffi_set.proxy_helper.proxy_ptr, + region_id, + &mut state as *mut _ as _, + error_msg.as_mut(), + ), + KVGetStatus::Ok + ); + assert!(state.has_region()); + assert_eq!(state.get_state(), kvproto::raft_serverpb::PeerState::Normal); + assert!(error_msg.as_ref().is_null()); + + let mut state = kvproto::raft_serverpb::RegionLocalState::default(); + assert_eq!( + f( + ffi_set.proxy_helper.proxy_ptr, + 0, // not exist + &mut state as *mut _ as _, + error_msg.as_mut(), + ), + KVGetStatus::NotFound + ); + assert!(!state.has_region()); + assert!(error_msg.as_ref().is_null()); + + ffi_set + .proxy + .get_value_cf("none_cf", "123".as_bytes(), |value| { + let msg = value.unwrap_err(); + assert_eq!(msg, "Storage Engine cf none_cf not found"); + }); + ffi_set + .proxy + .get_value_cf("raft", "123".as_bytes(), |value| { + let res = value.unwrap(); + assert!(res.is_none()); + }); + + // If we have no kv engine. + ffi_set.proxy.set_kv_engine(None); + let res = ffi_set.proxy_helper.fn_get_region_local_state.unwrap()( + ffi_set.proxy_helper.proxy_ptr, + region_id, + &mut state as *mut _ as _, + error_msg.as_mut(), + ); + assert_eq!(res, KVGetStatus::Error); + assert!(!error_msg.as_ref().is_null()); + assert_eq!( + error_msg.as_str(), + "KV engine is not initialized".as_bytes() + ); + }, + ); + } + + cluster.shutdown(); } - // The same to mock-engine-store - cluster.shutdown(); } -#[test] -fn test_store_setup() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - - // Add label to cluster - address_proxy_config(&mut cluster.cfg.tikv); - - // Try to start this node, return after persisted some keys. - let _ = cluster.start(); - let store_id = cluster.engines.keys().last().unwrap(); - let store = pd_client.get_store(*store_id).unwrap(); - println!("store {:?}", store); - assert!( - store - .get_labels() +mod config { + use super::*; + #[test] + fn test_config() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + let text = "memory-usage-high-water=0.65\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + write!(file, "{}", text).unwrap(); + let path = file.path(); + + let mut unrecognized_keys = Vec::new(); + let mut config = TiKvConfig::from_file(path, Some(&mut unrecognized_keys)).unwrap(); + // Othersize we have no default addr for TiKv. + setup_default_tikv_config(&mut config); + assert_eq!(config.memory_usage_high_water, 0.65); + assert_eq!(config.rocksdb.max_open_files, 111); + assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); + assert_eq!(unrecognized_keys.len(), 3); + + let mut proxy_unrecognized_keys = Vec::new(); + let proxy_config = + ProxyConfig::from_file(path, Some(&mut proxy_unrecognized_keys)).unwrap(); + assert_eq!(proxy_config.raft_store.snap_handle_pool_size, 4); + assert_eq!(proxy_config.server.engine_addr, "1.2.3.4:5"); + assert!(proxy_unrecognized_keys.contains(&"rocksdb".to_string())); + assert!(proxy_unrecognized_keys.contains(&"memory-usage-high-water".to_string())); + assert!(proxy_unrecognized_keys.contains(&"nosense".to_string())); + let v1 = vec!["a.b", "b"] + .iter() + .map(|e| String::from(*e)) + .collect::>(); + let v2 = vec!["a.b", "b.b", "c"] .iter() - .find(|&x| x.key == "engine" && x.value == "tiflash") - .is_some() - ); + .map(|e| String::from(*e)) + .collect::>(); + let unknown = ensure_no_common_unrecognized_keys(&v1, &v2); + assert_eq!(unknown.is_err(), true); + assert_eq!(unknown.unwrap_err(), "a.b, b.b"); + let unknown = + ensure_no_common_unrecognized_keys(&proxy_unrecognized_keys, &unrecognized_keys); + assert_eq!(unknown.is_err(), true); + assert_eq!(unknown.unwrap_err(), "nosense, rocksdb.z"); + + // Need run this test with ENGINE_LABEL_VALUE=tiflash, otherwise will fatal exit. + std::fs::remove_file( + PathBuf::from_str(&config.storage.data_dir) + .unwrap() + .join(LAST_CONFIG_FILE), + ); + validate_and_persist_config(&mut config, true); + + // Will not override ProxyConfig + let proxy_config_new = ProxyConfig::from_file(path, None).unwrap(); + assert_eq!(proxy_config_new.raft_store.snap_handle_pool_size, 4); + } + + #[test] + fn test_validate_config() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + let text = "memory-usage-high-water=0.65\n[raftstore.aaa]\nbbb=2\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + write!(file, "{}", text).unwrap(); + let path = file.path(); + let tmp_store_folder = tempfile::TempDir::new().unwrap(); + let tmp_last_config_path = tmp_store_folder.path().join(LAST_CONFIG_FILE); + std::fs::copy(path, tmp_last_config_path.as_path()).unwrap(); + std::fs::copy(path, "./last_ttikv.toml").unwrap(); + get_last_config(tmp_store_folder.path().to_str().unwrap()); + } - cluster.shutdown(); + #[test] + fn test_config_default_addr() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + let text = "memory-usage-high-water=0.65\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + write!(file, "{}", text).unwrap(); + let path = file.path(); + let args: Vec<&str> = vec![]; + let matches = App::new("RaftStore Proxy") + .arg( + Arg::with_name("config") + .short("C") + .long("config") + .value_name("FILE") + .help("Set the configuration file") + .takes_value(true), + ) + .get_matches_from(args); + let c = format!("--config {}", path.to_str().unwrap()); + let mut v = vec![c]; + let config = gen_tikv_config(&matches, false, &mut v); + assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); + assert_eq!(config.server.status_addr, TIFLASH_DEFAULT_STATUS_ADDR); + assert_eq!( + config.server.advertise_status_addr, + TIFLASH_DEFAULT_STATUS_ADDR + ); + assert_eq!( + config.raft_store.region_worker_tick_interval.as_millis(), + 500 + ); + } + + #[test] + fn test_store_setup() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // Add label to cluster + address_proxy_config(&mut cluster.cfg.tikv); + + // Try to start this node, return after persisted some keys. + let _ = cluster.start(); + let store_id = cluster.engines.keys().last().unwrap(); + let store = pd_client.get_store(*store_id).unwrap(); + println!("store {:?}", store); + assert!( + store + .get_labels() + .iter() + .find(|&x| x.key == "engine" && x.value == "tiflash") + .is_some() + ); + cluster.shutdown(); + } } -#[test] -fn test_interaction() { - // TODO Maybe we should pick this test to TiKV. - // This test is to check if empty entries can affect pre_exec and post_exec. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); +mod write { + use super::*; + #[test] + fn test_interaction() { + // TODO Maybe we should pick this test to TiKV. + // This test is to check if empty entries can affect pre_exec and post_exec. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - fail::cfg("try_flush_data", "return(0)").unwrap(); - let _ = cluster.run(); + fail::cfg("try_flush_data", "return(0)").unwrap(); + let _ = cluster.run(); - cluster.must_put(b"k1", b"v1"); - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); + cluster.must_put(b"k1", b"v1"); + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + + // Wait until all nodes have (k1, v1). + check_key(&cluster, b"k1", b"v1", Some(true), None, None); + + let prev_states = collect_all_states(&cluster, region_id); + let compact_log = test_raftstore::new_compact_log_request(100, 10); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let _ = cluster + .call_command_on_leader(req.clone(), Duration::from_secs(3)) + .unwrap(); + + // Empty result can also be handled by post_exec + let mut retry = 0; + let new_states = loop { + let new_states = collect_all_states(&cluster, region_id); + let mut ok = true; + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + if old.in_memory_apply_state == new.in_memory_apply_state + && old.in_memory_applied_term == new.in_memory_applied_term + { + ok = false; + break; + } + } + if ok { + break new_states; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + retry += 1; + }; + + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + // An empty cmd will not cause persistence. + assert_eq!(old.in_disk_apply_state, new.in_disk_apply_state); + } - // Wait until all nodes have (k1, v1). - check_key(&cluster, b"k1", b"v1", Some(true), None, None); + cluster.must_put(b"k2", b"v2"); + // Wait until all nodes have (k2, v2). + check_key(&cluster, b"k2", b"v2", Some(true), None, None); - let prev_states = collect_all_states(&cluster, region_id); - let compact_log = test_raftstore::new_compact_log_request(100, 10); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let _ = cluster - .call_command_on_leader(req.clone(), Duration::from_secs(3)) - .unwrap(); + fail::cfg("on_empty_cmd_normal", "return").unwrap(); + let prev_states = collect_all_states(&cluster, region_id); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); - // Empty result can also be handled by post_exec - let mut retry = 0; - let new_states = loop { + std::thread::sleep(std::time::Duration::from_millis(400)); let new_states = collect_all_states(&cluster, region_id); - let mut ok = true; for i in prev_states.keys() { let old = prev_states.get(i).unwrap(); let new = new_states.get(i).unwrap(); - if old.in_memory_apply_state == new.in_memory_apply_state - && old.in_memory_applied_term == new.in_memory_applied_term - { - ok = false; - break; - } - } - if ok { - break new_states; + assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); } - std::thread::sleep(std::time::Duration::from_millis(100)); - retry += 1; - }; - - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); - // An empty cmd will not cause persistence. - assert_eq!(old.in_disk_apply_state, new.in_disk_apply_state); + + fail::remove("try_flush_data"); + fail::remove("on_empty_cmd_normal"); + cluster.shutdown(); } - cluster.must_put(b"k2", b"v2"); - // Wait until all nodes have (k2, v2). - check_key(&cluster, b"k2", b"v2", Some(true), None, None); - - fail::cfg("on_empty_cmd_normal", "return").unwrap(); - let prev_states = collect_all_states(&cluster, region_id); - let _ = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - - std::thread::sleep(std::time::Duration::from_millis(400)); - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + #[test] + fn test_leadership_change_filter() { + test_leadership_change_impl(true); } - fail::remove("try_flush_data"); - fail::remove("on_empty_cmd_normal"); - cluster.shutdown(); -} + #[test] + fn test_leadership_change_no_persist() { + test_leadership_change_impl(false); + } -#[test] -fn test_leadership_change_filter() { - test_leadership_change_impl(true); -} + fn test_leadership_change_impl(filter: bool) { + // Test if a empty command can be observed when leadership changes. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); -#[test] -fn test_leadership_change_no_persist() { - test_leadership_change_impl(false); -} + disable_auto_gen_compact_log(&mut cluster); -fn test_leadership_change_impl(filter: bool) { - // Test if a empty command can be observed when leadership changes. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + if filter { + // We don't handle CompactLog at all. + fail::cfg("try_flush_data", "return(0)").unwrap(); + } else { + // We don't return Persist after handling CompactLog. + fail::cfg("no_persist_compact_log", "return").unwrap(); + } + // Do not handle empty cmd. + fail::cfg("on_empty_cmd_normal", "return").unwrap(); + let _ = cluster.run(); - // Disable AUTO generated compact log. - // This will not totally disable, so we use some failpoints later. - cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); - cluster.cfg.raft_store.raft_log_gc_threshold = 1000; + cluster.must_put(b"k1", b"v1"); + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); - if filter { - // We don't handle CompactLog at all. - fail::cfg("try_flush_data", "return(0)").unwrap(); - } else { - // We don't return Persist after handling CompactLog. - fail::cfg("no_persist_compact_log", "return").unwrap(); - } - // Do not handle empty cmd. - fail::cfg("on_empty_cmd_normal", "return").unwrap(); - let _ = cluster.run(); - - cluster.must_put(b"k1", b"v1"); - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); - let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); - cluster.must_transfer_leader(region.get_id(), peer_1.clone()); - - cluster.must_put(b"k2", b"v2"); - fail::cfg("on_empty_cmd_normal", "return").unwrap(); - - // Wait until all nodes have (k2, v2), then transfer leader. - check_key(&cluster, b"k2", b"v2", Some(true), None, None); - if filter { - // We should also filter normal kv, since a empty result can also be invoke pose_exec. - fail::cfg("on_post_exec_normal", "return(false)").unwrap(); - } - let prev_states = collect_all_states(&cluster, region_id); - cluster.must_transfer_leader(region.get_id(), peer_2.clone()); - - // The states remain the same, since we don't observe empty cmd. - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); + let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); + cluster.must_transfer_leader(region.get_id(), peer_1.clone()); + + cluster.must_put(b"k2", b"v2"); + fail::cfg("on_empty_cmd_normal", "return").unwrap(); + + // Wait until all nodes have (k2, v2), then transfer leader. + check_key(&cluster, b"k2", b"v2", Some(true), None, None); if filter { - // CompactLog can still change in-memory state, when exec in memory. - assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + // We should also filter normal kv, since a empty result can also be invoke pose_exec. + fail::cfg("on_post_exec_normal", "return(false)").unwrap(); + } + let prev_states = collect_all_states(&cluster, region_id); + cluster.must_transfer_leader(region.get_id(), peer_2.clone()); + + // The states remain the same, since we don't observe empty cmd. + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + if filter { + // CompactLog can still change in-memory state, when exec in memory. + assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + } + assert_eq!(old.in_disk_apply_state, new.in_disk_apply_state); } - assert_eq!(old.in_disk_apply_state, new.in_disk_apply_state); - } - fail::remove("on_empty_cmd_normal"); - // We need forward empty cmd generated by leadership changing to TiFlash. - cluster.must_transfer_leader(region.get_id(), peer_1.clone()); - std::thread::sleep(std::time::Duration::from_secs(1)); - - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term); + fail::remove("on_empty_cmd_normal"); + // We need forward empty cmd generated by leadership changing to TiFlash. + cluster.must_transfer_leader(region.get_id(), peer_1.clone()); + std::thread::sleep(std::time::Duration::from_secs(1)); + + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term); + } + + if filter { + fail::remove("try_flush_data"); + fail::remove("on_post_exec_normal"); + } else { + fail::remove("no_persist_compact_log"); + } + cluster.shutdown(); } - if filter { - fail::remove("try_flush_data"); - fail::remove("on_post_exec_normal"); - } else { - fail::remove("no_persist_compact_log"); + #[test] + fn test_kv_write_always_persist() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + let _ = cluster.run(); + + cluster.must_put(b"k0", b"v0"); + let region_id = cluster.get_region(b"k0").get_id(); + + let mut prev_states = collect_all_states(&cluster, region_id); + // Always persist on every command + fail::cfg("on_post_exec_normal_end", "return(true)").unwrap(); + for i in 1..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + + // We can't always get kv from disk, even we commit everytime, + // since they are filtered by engint_tiflash + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + + // This may happen after memory write data and before commit. + // We must check if we already have in memory. + check_apply_state(&cluster, region_id, &prev_states, Some(false), None); + std::thread::sleep(std::time::Duration::from_millis(20)); + // However, advanced apply index will always persisted. + let new_states = collect_all_states(&cluster, region_id); + for id in cluster.engines.keys() { + let p = &prev_states.get(id).unwrap().in_disk_apply_state; + let n = &new_states.get(id).unwrap().in_disk_apply_state; + assert_ne!(p, n); + } + prev_states = new_states; + } + fail::remove("on_post_exec_normal_end"); + cluster.shutdown(); } - cluster.shutdown(); -} -#[test] -fn test_kv_write_always_persist() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + #[test] + fn test_kv_write() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + fail::cfg("on_post_exec_normal", "return(false)").unwrap(); + fail::cfg("on_post_exec_admin", "return(false)").unwrap(); + // Abandon CompactLog and previous flush. + fail::cfg("try_flush_data", "return(0)").unwrap(); + + let _ = cluster.run(); - let _ = cluster.run(); + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } - cluster.must_put(b"k0", b"v0"); - let region_id = cluster.get_region(b"k0").get_id(); + // Since we disable all observers, we can get nothing in either memory and disk. + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(false), + Some(false), + None, + ); + } - let mut prev_states = collect_all_states(&cluster, region_id); - // Always persist on every command - fail::cfg("on_post_exec_normal_end", "return(true)").unwrap(); - for i in 1..20 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); + // We can read initial raft state, since we don't persist meta either. + let r1 = cluster.get_region(b"k1").get_id(); + let prev_states = collect_all_states(&cluster, r1); - // We can't always get kv from disk, even we commit everytime, - // since they are filtered by engint_tiflash - check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + fail::remove("on_post_exec_normal"); + fail::remove("on_post_exec_admin"); + for i in 10..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } - // This may happen after memory write data and before commit. - // We must check if we already have in memory. - check_apply_state(&cluster, region_id, &prev_states, Some(false), None); - std::thread::sleep(std::time::Duration::from_millis(20)); - // However, advanced apply index will always persisted. - let new_states = collect_all_states(&cluster, region_id); + // Since we enable all observers, we can get in memory. + // However, we get nothing in disk since we don't persist. + for i in 10..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + Some(false), + None, + ); + } + + let new_states = collect_all_states(&cluster, r1); for id in cluster.engines.keys() { - let p = &prev_states.get(id).unwrap().in_disk_apply_state; - let n = &new_states.get(id).unwrap().in_disk_apply_state; - assert_ne!(p, n); + assert_ne!( + &prev_states.get(id).unwrap().in_memory_apply_state, + &new_states.get(id).unwrap().in_memory_apply_state + ); + assert_eq!( + &prev_states.get(id).unwrap().in_disk_apply_state, + &new_states.get(id).unwrap().in_disk_apply_state + ); } - prev_states = new_states; - } - cluster.shutdown(); -} + std::thread::sleep(std::time::Duration::from_millis(20)); + fail::remove("try_flush_data"); -#[test] -fn test_kv_write() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let prev_states = collect_all_states(&cluster, r1); + // Write more after we force persist when CompactLog. + for i in 20..30 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } - fail::cfg("on_post_exec_normal", "return(false)").unwrap(); - fail::cfg("on_post_exec_admin", "return(false)").unwrap(); - // Abandon CompactLog and previous flush. - fail::cfg("try_flush_data", "return(0)").unwrap(); + // We can read from mock-store's memory, we are not sure if we can read from disk, + // since there may be or may not be a CompactLog. + for i in 11..30 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } - let _ = cluster.run(); + // Force a compact log to persist. + let region_r = cluster.get_region("k1".as_bytes()); + let region_id = region_r.get_id(); + let compact_log = test_raftstore::new_compact_log_request(1000, 100); + let req = + test_raftstore::new_admin_request(region_id, region_r.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + assert!(res.get_header().has_error(), "{:?}", res); + // This CompactLog is executed with an error. It will not trigger a compaction. + // However, it can trigger a persistence. + for i in 11..30 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + Some(true), + None, + ); + } - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - } + let new_states = collect_all_states(&cluster, r1); - // Since we disable all observers, we can get nothing in either memory and disk. - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key( - &cluster, - k.as_bytes(), - v.as_bytes(), - Some(false), - Some(false), - None, - ); + // apply_state is changed in memory, and persisted. + for id in cluster.engines.keys() { + assert_ne!( + &prev_states.get(id).unwrap().in_memory_apply_state, + &new_states.get(id).unwrap().in_memory_apply_state + ); + assert_ne!( + &prev_states.get(id).unwrap().in_disk_apply_state, + &new_states.get(id).unwrap().in_disk_apply_state + ); + } + + fail::remove("no_persist_compact_log"); + cluster.shutdown(); } - // We can read initial raft state, since we don't persist meta either. - let r1 = cluster.get_region(b"k1").get_id(); - let prev_states = collect_all_states(&cluster, r1); + #[test] + fn test_consistency_check() { + // ComputeHash and VerifyHash shall be filtered. + let (mut cluster, pd_client) = new_mock_cluster(0, 2); - fail::remove("on_post_exec_normal"); - fail::remove("on_post_exec_admin"); - for i in 10..20 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - } + cluster.run(); - // Since we enable all observers, we can get in memory. - // However, we get nothing in disk since we don't persist. - for i in 10..20 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key( - &cluster, - k.as_bytes(), - v.as_bytes(), - Some(true), - Some(false), - None, - ); - } + cluster.must_put(b"k", b"v"); + let region = cluster.get_region("k".as_bytes()); + let region_id = region.get_id(); - let new_states = collect_all_states(&cluster, r1); - for id in cluster.engines.keys() { - assert_ne!( - &prev_states.get(id).unwrap().in_memory_apply_state, - &new_states.get(id).unwrap().in_memory_apply_state - ); - assert_eq!( - &prev_states.get(id).unwrap().in_disk_apply_state, - &new_states.get(id).unwrap().in_disk_apply_state - ); - } + let r = new_compute_hash_request(); + let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), r); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); - std::thread::sleep(std::time::Duration::from_millis(20)); - fail::remove("try_flush_data"); + let r = new_verify_hash_request(vec![7, 8, 9, 0], 1000); + let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), r); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); - let prev_states = collect_all_states(&cluster, r1); - // Write more after we force persist when CompactLog. - for i in 20..30 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); + cluster.must_put(b"k2", b"v2"); + cluster.shutdown(); } - // We can read from mock-store's memory, we are not sure if we can read from disk, - // since there may be or may not be a CompactLog. - for i in 11..30 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); - } + #[test] + fn test_old_compact_log() { + // If we just return None for CompactLog, the region state in ApplyFsm will change. + // Because there is no rollback in new implementation. + // This is a ERROR state. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + cluster.run(); - // Force a compact log to persist. - let region_r = cluster.get_region("k1".as_bytes()); - let region_id = region_r.get_id(); - let compact_log = test_raftstore::new_compact_log_request(1000, 100); - let req = - test_raftstore::new_admin_request(region_id, region_r.get_region_epoch(), compact_log); - let res = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - assert!(res.get_header().has_error(), "{:?}", res); - // This CompactLog is executed with an error. It will not trigger a compaction. - // However, it can trigger a persistence. - for i in 11..30 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key( - &cluster, - k.as_bytes(), - v.as_bytes(), - Some(true), - Some(true), - None, - ); - } + // We don't return Persist after handling CompactLog. + fail::cfg("no_persist_compact_log", "return").unwrap(); + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } - let new_states = collect_all_states(&cluster, r1); + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } - // apply_state is changed in memory, and persisted. - for id in cluster.engines.keys() { - assert_ne!( - &prev_states.get(id).unwrap().in_memory_apply_state, - &new_states.get(id).unwrap().in_memory_apply_state - ); - assert_ne!( - &prev_states.get(id).unwrap().in_disk_apply_state, - &new_states.get(id).unwrap().in_disk_apply_state - ); - } + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let prev_state = collect_all_states(&cluster, region_id); + let (compact_index, compact_term) = get_valid_compact_index(&prev_state); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + + // Wait for state applys. + std::thread::sleep(std::time::Duration::from_secs(2)); + + let new_state = collect_all_states(&cluster, region_id); + for i in prev_state.keys() { + let old = prev_state.get(i).unwrap(); + let new = new_state.get(i).unwrap(); + assert_ne!( + old.in_memory_apply_state.get_truncated_state(), + new.in_memory_apply_state.get_truncated_state() + ); + assert_eq!( + old.in_disk_apply_state.get_truncated_state(), + new.in_disk_apply_state.get_truncated_state() + ); + } - fail::remove("no_persist_compact_log"); - cluster.shutdown(); -} + fail::remove("no_persist_compact_log"); + cluster.shutdown(); + } -#[test] -fn test_consistency_check() { - // ComputeHash and VerifyHash shall be filtered. - let (mut cluster, pd_client) = new_mock_cluster(0, 2); + #[test] + fn test_compact_log() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - cluster.run(); + disable_auto_gen_compact_log(&mut cluster); - cluster.must_put(b"k", b"v"); - let region = cluster.get_region("k".as_bytes()); - let region_id = region.get_id(); + cluster.run(); - let r = new_compute_hash_request(); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), r); - let _ = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); + cluster.must_put(b"k", b"v"); + let region = cluster.get_region("k".as_bytes()); + let region_id = region.get_id(); - let r = new_verify_hash_request(vec![7, 8, 9, 0], 1000); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), r); - let _ = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); + fail::cfg("on_empty_cmd_normal", "return").unwrap(); + fail::cfg("try_flush_data", "return(0)").unwrap(); + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } - cluster.must_put(b"k2", b"v2"); - cluster.shutdown(); -} + std::thread::sleep(std::time::Duration::from_millis(500)); + let prev_state = collect_all_states(&cluster, region_id); + + let (compact_index, compact_term) = get_valid_compact_index(&prev_state); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + // compact index should less than applied index + assert!(!res.get_header().has_error(), "{:?}", res); + + // TODO(tiflash) Make sure compact log is filtered successfully. + // Can be abstract to a retry function. + std::thread::sleep(std::time::Duration::from_millis(500)); + + // CompactLog is filtered, because we can't flush data. + // However, we can still observe apply index advanced + let new_state = collect_all_states(&cluster, region_id); + for i in prev_state.keys() { + let old = prev_state.get(i).unwrap(); + let new = new_state.get(i).unwrap(); + assert_eq!( + old.in_memory_apply_state.get_truncated_state(), + new.in_memory_apply_state.get_truncated_state() + ); + assert_eq!( + old.in_disk_apply_state.get_truncated_state(), + new.in_disk_apply_state.get_truncated_state() + ); + assert_eq!( + old.in_memory_apply_state.get_applied_index() + 1, + new.in_memory_apply_state.get_applied_index() + ); + // Persist is before. + assert_eq!( + old.in_disk_apply_state.get_applied_index(), + new.in_disk_apply_state.get_applied_index() + ); + } -#[test] -fn test_old_compact_log() { - // If we just return None for CompactLog, the region state in ApplyFsm will change. - // Because there is no rollback in new implementation. - // This is a ERROR state. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - cluster.run(); - - // We don't return Persist after handling CompactLog. - fail::cfg("no_persist_compact_log", "return").unwrap(); - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - } + fail::remove("on_empty_cmd_normal"); + fail::remove("try_flush_data"); - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); - } + let (compact_index, compact_term) = get_valid_compact_index(&new_state); + let prev_state = new_state; + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + assert!(!res.get_header().has_error(), "{:?}", res); + + cluster.must_put(b"kz", b"vz"); + check_key(&cluster, b"kz", b"vz", Some(true), None, None); + + // CompactLog is not filtered + let new_state = collect_all_states(&cluster, region_id); + for i in prev_state.keys() { + let old = prev_state.get(i).unwrap(); + let new = new_state.get(i).unwrap(); + assert_ne!( + old.in_memory_apply_state.get_truncated_state(), + new.in_memory_apply_state.get_truncated_state() + ); + assert_eq!( + old.in_memory_apply_state.get_applied_index() + 2, // compact log + (kz,vz) + new.in_memory_apply_state.get_applied_index() + ); + } - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - let prev_state = collect_all_states(&cluster, region_id); - let (compact_index, compact_term) = get_valid_compact_index(&prev_state); - let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let res = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - - // Wait for state applys. - std::thread::sleep(std::time::Duration::from_secs(2)); - - let new_state = collect_all_states(&cluster, region_id); - for i in prev_state.keys() { - let old = prev_state.get(i).unwrap(); - let new = new_state.get(i).unwrap(); - assert_ne!( - old.in_memory_apply_state.get_truncated_state(), - new.in_memory_apply_state.get_truncated_state() - ); - assert_eq!( - old.in_disk_apply_state.get_truncated_state(), - new.in_disk_apply_state.get_truncated_state() - ); + cluster.shutdown(); } - cluster.shutdown(); -} - -#[test] -fn test_compact_log() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + #[test] + fn test_empty_cmd() { + // Test if a empty command can be observed when leadership changes. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + // Disable compact log + cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); + cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); + cluster.cfg.raft_store.raft_log_gc_threshold = 1000; - // Disable auto compact log - cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); - cluster.cfg.raft_store.raft_log_gc_threshold = 1000; + let _ = cluster.run(); - cluster.run(); + cluster.must_put(b"k1", b"v1"); + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); + let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); + cluster.must_transfer_leader(region.get_id(), peer_1.clone()); + std::thread::sleep(std::time::Duration::from_secs(2)); - cluster.must_put(b"k", b"v"); - let region = cluster.get_region("k".as_bytes()); - let region_id = region.get_id(); + check_key(&cluster, b"k1", b"v1", Some(true), None, None); + let prev_states = collect_all_states(&cluster, region_id); - fail::cfg("on_empty_cmd_normal", "return").unwrap(); - fail::cfg("try_flush_data", "return(0)").unwrap(); - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - } - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); - } + // We need forward empty cmd generated by leadership changing to TiFlash. + cluster.must_transfer_leader(region.get_id(), peer_2.clone()); + std::thread::sleep(std::time::Duration::from_secs(2)); - std::thread::sleep(std::time::Duration::from_millis(500)); - let prev_state = collect_all_states(&cluster, region_id); - - let (compact_index, compact_term) = get_valid_compact_index(&prev_state); - let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let res = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - // compact index should less than applied index - assert!(!res.get_header().has_error(), "{:?}", res); - - // TODO(tiflash) Make sure compact log is filtered successfully. - // Can be abstract to a retry function. - std::thread::sleep(std::time::Duration::from_millis(500)); - - // CompactLog is filtered, because we can't flush data. - // However, we can still observe apply index advanced - let new_state = collect_all_states(&cluster, region_id); - for i in prev_state.keys() { - let old = prev_state.get(i).unwrap(); - let new = new_state.get(i).unwrap(); - assert_eq!( - old.in_memory_apply_state.get_truncated_state(), - new.in_memory_apply_state.get_truncated_state() - ); - assert_eq!( - old.in_disk_apply_state.get_truncated_state(), - new.in_disk_apply_state.get_truncated_state() - ); - assert_eq!( - old.in_memory_apply_state.get_applied_index() + 1, - new.in_memory_apply_state.get_applied_index() - ); - // Persist is before. - assert_eq!( - old.in_disk_apply_state.get_applied_index(), - new.in_disk_apply_state.get_applied_index() - ); - } + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term); + } - fail::remove("on_empty_cmd_normal"); - fail::remove("try_flush_data"); - - let (compact_index, compact_term) = get_valid_compact_index(&new_state); - let prev_state = new_state; - let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let res = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - assert!(!res.get_header().has_error(), "{:?}", res); - - cluster.must_put(b"kz", b"vz"); - check_key(&cluster, b"kz", b"vz", Some(true), None, None); - - // CompactLog is not filtered - let new_state = collect_all_states(&cluster, region_id); - for i in prev_state.keys() { - let old = prev_state.get(i).unwrap(); - let new = new_state.get(i).unwrap(); - assert_ne!( - old.in_memory_apply_state.get_truncated_state(), - new.in_memory_apply_state.get_truncated_state() - ); - assert_eq!( - old.in_memory_apply_state.get_applied_index() + 2, // compact log + (kz,vz) - new.in_memory_apply_state.get_applied_index() - ); - } + std::thread::sleep(std::time::Duration::from_secs(2)); + fail::cfg("on_empty_cmd_normal", "return").unwrap(); - cluster.shutdown(); -} + let prev_states = new_states; + cluster.must_transfer_leader(region.get_id(), peer_1.clone()); + std::thread::sleep(std::time::Duration::from_secs(2)); -// TODO(tiflash) Test a KV will not be write twice by not only handle_put but also observer. When we fully enable engine_tiflash. + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + } -pub fn new_ingest_sst_cmd(meta: SstMeta) -> Request { - let mut cmd = Request::default(); - cmd.set_cmd_type(CmdType::IngestSst); - cmd.mut_ingest_sst().set_sst(meta); - cmd -} + fail::remove("on_empty_cmd_normal"); -pub fn create_tmp_importer(cfg: &Config, kv_path: &str) -> (PathBuf, Arc) { - let dir = Path::new(kv_path).join("import-sst"); - let importer = { - Arc::new( - SstImporter::new(&cfg.import, dir.clone(), None, cfg.storage.api_version()).unwrap(), - ) - }; - (dir, importer) + cluster.shutdown(); + } } mod ingest { @@ -754,6 +949,23 @@ mod ingest { use super::*; + pub fn new_ingest_sst_cmd(meta: SstMeta) -> Request { + let mut cmd = Request::default(); + cmd.set_cmd_type(CmdType::IngestSst); + cmd.mut_ingest_sst().set_sst(meta); + cmd + } + + pub fn create_tmp_importer(cfg: &Config, kv_path: &str) -> (PathBuf, Arc) { + let dir = Path::new(kv_path).join("import-sst"); + let importer = { + Arc::new( + SstImporter::new(&cfg.import, dir.clone(), None, cfg.storage.api_version()) + .unwrap(), + ) + }; + (dir, importer) + } fn make_sst( cluster: &Cluster, region_id: u64, @@ -783,10 +995,10 @@ mod ingest { // copy file to save dir. let src = sst_path.clone(); - let dst = file.path.save.to_str().unwrap(); + let dst = file.get_import_path().save.to_str().unwrap(); std::fs::copy(src.clone(), dst); - (file.path.save.clone(), meta, sst_path) + (file.get_import_path().save.clone(), meta, sst_path) } #[test] @@ -862,11 +1074,7 @@ mod ingest { fn test_ingest_return_none() { let (mut cluster, pd_client) = new_mock_cluster(0, 1); - // Disable auto compact log - cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); - cluster.cfg.raft_store.raft_log_gc_threshold = 1000; + disable_auto_gen_compact_log(&mut cluster); let _ = cluster.run(); @@ -969,119 +1177,571 @@ mod ingest { } } -#[test] -fn test_handle_destroy() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - - // Disable raft log gc in this test case. - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); - - // Disable default max peer count check. - pd_client.disable_default_operator(); - - cluster.run(); - cluster.must_put(b"k1", b"v1"); - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); - let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); - cluster.must_transfer_leader(region_id, peer_1); - - iter_ffi_helpers( - &cluster, - Some(vec![eng_ids[1]]), - &mut |_, _, ffi: &mut FFIHelperSet| { - let server = &ffi.engine_store_server; - assert!(server.kvstore.contains_key(®ion_id)); - }, - ); - - pd_client.must_remove_peer(region_id, peer_2); - - check_key( - &cluster, - b"k1", - b"k2", - Some(false), - None, - Some(vec![eng_ids[1]]), - ); - - // Region removed in server. - iter_ffi_helpers( - &cluster, - Some(vec![eng_ids[1]]), - &mut |_, _, ffi: &mut FFIHelperSet| { - let server = &ffi.engine_store_server; - assert!(!server.kvstore.contains_key(®ion_id)); - }, - ); +mod restart { + use super::*; + #[test] + fn test_snap_restart() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); + disable_auto_gen_compact_log(&mut cluster); + cluster.cfg.raft_store.max_snapshot_file_raw_size = ReadableSize(u64::MAX); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + let first_value = vec![0; 10240]; + for i in 0..10 { + let key = format!("{:03}", i); + cluster.must_put(key.as_bytes(), &first_value); + } + let first_key: &[u8] = b"000"; + + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + tikv_util::info!("engine_2 is {}", eng_ids[1]); + // engine 2 will not exec post apply snapshot. + fail::cfg("on_ob_pre_handle_snapshot", "return").unwrap(); + fail::cfg("on_ob_post_apply_snapshot", "return").unwrap(); + + let engine_2 = cluster.get_engine(eng_ids[1]); + must_get_none(&engine_2, first_key); + // add peer (engine_2,engine_2) to region 1. + pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1])); + + check_key(&cluster, first_key, &first_value, Some(false), None, None); + + info!("stop node {}", eng_ids[1]); + cluster.stop_node(eng_ids[1]); + { + let lock = cluster.ffi_helper_set.lock(); + lock.unwrap() + .deref_mut() + .get_mut(&eng_ids[1]) + .unwrap() + .engine_store_server + .stop(); + } + + fail::remove("on_ob_pre_handle_snapshot"); + fail::remove("on_ob_post_apply_snapshot"); + info!("resume node {}", eng_ids[1]); + { + let lock = cluster.ffi_helper_set.lock(); + lock.unwrap() + .deref_mut() + .get_mut(&eng_ids[1]) + .unwrap() + .engine_store_server + .restore(); + } + info!("restored node {}", eng_ids[1]); + cluster.run_node(eng_ids[1]).unwrap(); + + let (key, value) = (b"k2", b"v2"); + cluster.must_put(key, value); + // we can get in memory, since snapshot is pre handled, though it is not persisted + check_key( + &cluster, + key, + value, + Some(true), + None, + Some(vec![eng_ids[1]]), + ); + // now snapshot must be applied on peer engine_2 + check_key( + &cluster, + first_key, + first_value.as_slice(), + Some(true), + None, + Some(vec![eng_ids[1]]), + ); + + cluster.shutdown(); + } + + #[test] + fn test_kv_restart() { + // Test if a empty command can be observed when leadership changes. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // Disable AUTO generated compact log. + disable_auto_gen_compact_log(&mut cluster); + + // We don't handle CompactLog at all. + fail::cfg("try_flush_data", "return(0)").unwrap(); + let _ = cluster.run(); + + cluster.must_put(b"k", b"v"); + let region = cluster.get_region(b"k"); + let region_id = region.get_id(); + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + let prev_state = collect_all_states(&cluster, region_id); + let (compact_index, compact_term) = get_valid_compact_index(&prev_state); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + fail::cfg("try_flush_data", "return(1)").unwrap(); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + // Whatever already persisted or not, we won't loss data. + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + Some(true), + Some(vec![eng_ids[0]]), + ); + } + + for i in 10..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + + for i in 10..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + // Whatever already persisted or not, we won't loss data. + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + Some(false), + Some(vec![eng_ids[0]]), + ); + } + + info!("stop node {}", eng_ids[0]); + cluster.stop_node(eng_ids[0]); + { + let lock = cluster.ffi_helper_set.lock(); + lock.unwrap() + .deref_mut() + .get_mut(&eng_ids[0]) + .unwrap() + .engine_store_server + .stop(); + } + + info!("resume node {}", eng_ids[0]); + { + let lock = cluster.ffi_helper_set.lock(); + lock.unwrap() + .deref_mut() + .get_mut(&eng_ids[0]) + .unwrap() + .engine_store_server + .restore(); + } + info!("restored node {}", eng_ids[0]); + cluster.run_node(eng_ids[0]).unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(2000)); + + for i in 0..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + // Whatever already persisted or not, we won't loss data. + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + None, + Some(vec![eng_ids[0]]), + ); + } - cluster.shutdown(); + fail::remove("try_flush_data"); + cluster.shutdown(); + } } -#[test] -fn test_empty_cmd() { - // Test if a empty command can be observed when leadership changes. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - // Disable compact log - cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); - cluster.cfg.raft_store.raft_log_gc_threshold = 1000; - - let _ = cluster.run(); - - cluster.must_put(b"k1", b"v1"); - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); - let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); - cluster.must_transfer_leader(region.get_id(), peer_1.clone()); - std::thread::sleep(std::time::Duration::from_secs(2)); - - check_key(&cluster, b"k1", b"v1", Some(true), None, None); - let prev_states = collect_all_states(&cluster, region_id); - - // We need forward empty cmd generated by leadership changing to TiFlash. - cluster.must_transfer_leader(region.get_id(), peer_2.clone()); - std::thread::sleep(std::time::Duration::from_secs(2)); - - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term); +mod snapshot { + use super::*; + + #[test] + fn test_huge_snapshot() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); + disable_auto_gen_compact_log(&mut cluster); + cluster.cfg.raft_store.max_snapshot_file_raw_size = ReadableSize(u64::MAX); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + let first_value = vec![0; 10240]; + // at least 4m data + for i in 0..400 { + let key = format!("{:03}", i); + cluster.must_put(key.as_bytes(), &first_value); + } + let first_key: &[u8] = b"000"; + + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + tikv_util::info!("engine_2 is {}", eng_ids[1]); + let engine_2 = cluster.get_engine(eng_ids[1]); + must_get_none(&engine_2, first_key); + // add peer (engine_2,engine_2) to region 1. + pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1])); + + let (key, value) = (b"k2", b"v2"); + cluster.must_put(key, value); + // we can get in memory, since snapshot is pre handled, though it is not persisted + check_key( + &cluster, + key, + value, + Some(true), + None, + Some(vec![eng_ids[1]]), + ); + // now snapshot must be applied on peer engine_2 + must_get_equal(&engine_2, first_key, first_value.as_slice()); + + // engine 3 will not exec post apply snapshot. + fail::cfg("on_ob_post_apply_snapshot", "pause").unwrap(); + + tikv_util::info!("engine_3 is {}", eng_ids[2]); + let engine_3 = cluster.get_engine(eng_ids[2]); + must_get_none(&engine_3, first_key); + pd_client.must_add_peer(r1, new_peer(eng_ids[2], eng_ids[2])); + + std::thread::sleep(std::time::Duration::from_millis(500)); + // We have not apply pre handled snapshot, + // we can't be sure if it exists in only get from memory too, since pre handle snapshot is async. + must_get_none(&engine_3, first_key); + fail::remove("on_ob_post_apply_snapshot"); + std::thread::sleep(std::time::Duration::from_millis(500)); + + tikv_util::info!("put to engine_3"); + let (key, value) = (b"k3", b"v3"); + cluster.must_put(key, value); + tikv_util::info!("check engine_3"); + check_key(&cluster, key, value, Some(true), None, None); + + fail::remove("on_can_apply_snapshot"); + + cluster.shutdown(); + } + + #[test] + fn test_concurrent_snapshot() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + disable_auto_gen_compact_log(&mut cluster); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + cluster.must_put(b"k1", b"v1"); + pd_client.must_add_peer(r1, new_peer(2, 2)); + // Force peer 2 to be followers all the way. + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(r1, 2) + .msg_type(MessageType::MsgRequestVote) + .direction(Direction::Send), + )); + cluster.must_transfer_leader(r1, new_peer(1, 1)); + cluster.must_put(b"k3", b"v3"); + // Pile up snapshots of overlapped region ranges and deliver them all at once. + let (tx, rx) = mpsc::channel(); + cluster + .sim + .wl() + .add_recv_filter(3, Box::new(CollectSnapshotFilter::new(tx))); + pd_client.must_add_peer(r1, new_peer(3, 3)); + let region = cluster.get_region(b"k1"); + // Ensure the snapshot of range ("", "") is sent and piled in filter. + if let Err(e) = rx.recv_timeout(Duration::from_secs(1)) { + panic!("the snapshot is not sent before split, e: {:?}", e); + } + + // Occasionally fails. + // // Split the region range and then there should be another snapshot for the split ranges. + // cluster.must_split(®ion, b"k2"); + // check_key(&cluster, b"k3", b"v3", None, Some(true), Some(vec![3])); + // + // // Ensure the regions work after split. + // cluster.must_put(b"k11", b"v11"); + // check_key(&cluster, b"k11", b"v11", Some(true), None, Some(vec![3])); + // cluster.must_put(b"k4", b"v4"); + // check_key(&cluster, b"k4", b"v4", Some(true), None, Some(vec![3])); + + cluster.shutdown(); + } + + fn new_split_region_cluster(count: u64) -> (Cluster, Arc) { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + // Disable raft log gc in this test case. + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); + // Disable default max peer count check. + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + for i in 0..count { + let k = format!("k{:0>4}", 2 * i + 1); + let v = format!("v{}", 2 * i + 1); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + + // k1 in [ , ] splited by k2 -> (, k2] [k2, ) + // k3 in [k2, ) splited by k4 -> [k2, k4) [k4, ) + for i in 0..count { + let k = format!("k{:0>4}", 2 * i + 1); + let region = cluster.get_region(k.as_bytes()); + let sp = format!("k{:0>4}", 2 * i + 2); + cluster.must_split(®ion, sp.as_bytes()); + let region = cluster.get_region(k.as_bytes()); + } + + (cluster, pd_client) + } + + #[test] + fn test_prehandle_fail() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // Disable raft log gc in this test case. + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + cluster.must_put(b"k1", b"v1"); + + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + // If we fail to call pre-handle snapshot, we can still handle it when apply snapshot. + fail::cfg("before_actually_pre_handle", "return").unwrap(); + pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1])); + check_key( + &cluster, + b"k1", + b"v1", + Some(true), + Some(true), + Some(vec![eng_ids[1]]), + ); + fail::remove("before_actually_pre_handle"); + + // If we failed in apply snapshot(not panic), even if per_handle_snapshot is not called. + fail::cfg("on_ob_pre_handle_snapshot", "return").unwrap(); + check_key( + &cluster, + b"k1", + b"v1", + Some(false), + Some(false), + Some(vec![eng_ids[2]]), + ); + pd_client.must_add_peer(r1, new_peer(eng_ids[2], eng_ids[2])); + check_key( + &cluster, + b"k1", + b"v1", + Some(true), + Some(true), + Some(vec![eng_ids[2]]), + ); + fail::remove("on_ob_pre_handle_snapshot"); + + cluster.shutdown(); } - std::thread::sleep(std::time::Duration::from_secs(2)); - fail::cfg("on_empty_cmd_normal", "return").unwrap(); + #[test] + fn test_split_merge() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // Can always apply snapshot immediately + fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); + cluster.cfg.raft_store.right_derive_when_split = true; + + // May fail if cluster.start, since node 2 is not in region1.peers(), + // and node 2 has not bootstrap region1, + // because region1 is not bootstrap if we only call cluster.start() + cluster.run(); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); - let prev_states = new_states; - cluster.must_transfer_leader(region.get_id(), peer_1.clone()); - std::thread::sleep(std::time::Duration::from_secs(2)); + check_key(&cluster, b"k1", b"v1", Some(true), None, None); + check_key(&cluster, b"k3", b"v3", Some(true), None, None); - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + let r1 = cluster.get_region(b"k1"); + let r3 = cluster.get_region(b"k3"); + assert_eq!(r1.get_id(), r3.get_id()); + + cluster.must_split(&r1, b"k2"); + let r1_new = cluster.get_region(b"k1"); + let r3_new = cluster.get_region(b"k3"); + + assert_eq!(r1.get_id(), r3_new.get_id()); + + iter_ffi_helpers(&cluster, None, &mut |id: u64, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + if !server.kvstore.contains_key(&r1_new.get_id()) { + panic!("node {} has no region {}", id, r1_new.get_id()) + } + if !server.kvstore.contains_key(&r3_new.get_id()) { + panic!("node {} has no region {}", id, r3_new.get_id()) + } + // Region meta must equal + assert_eq!(server.kvstore.get(&r1_new.get_id()).unwrap().region, r1_new); + assert_eq!(server.kvstore.get(&r3_new.get_id()).unwrap().region, r3_new); + + // Can get from disk + check_key(&cluster, b"k1", b"v1", None, Some(true), None); + check_key(&cluster, b"k3", b"v3", None, Some(true), None); + // TODO Region in memory data must not contradict, but now we do not delete data + }); + + pd_client.must_merge(r1_new.get_id(), r3_new.get_id()); + let r1_new2 = cluster.get_region(b"k1"); + let r3_new2 = cluster.get_region(b"k3"); + + iter_ffi_helpers(&cluster, None, &mut |id: u64, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + + // The left region is removed + if server.kvstore.contains_key(&r1_new.get_id()) { + panic!("node {} should has no region {}", id, r1_new.get_id()) + } + if !server.kvstore.contains_key(&r3_new.get_id()) { + panic!("node {} has no region {}", id, r3_new.get_id()) + } + // Region meta must equal + assert_eq!( + server.kvstore.get(&r3_new2.get_id()).unwrap().region, + r3_new2 + ); + + // Can get from disk + check_key(&cluster, b"k1", b"v1", None, Some(true), None); + check_key(&cluster, b"k3", b"v3", None, Some(true), None); + // TODO Region in memory data must not contradict, but now we do not delete data + + let origin_epoch = r3_new.get_region_epoch(); + let new_epoch = r3_new2.get_region_epoch(); + // PrepareMerge + CommitMerge, so it should be 2. + assert_eq!(new_epoch.get_version(), origin_epoch.get_version() + 2); + assert_eq!(new_epoch.get_conf_ver(), origin_epoch.get_conf_ver()); + }); + + fail::remove("on_can_apply_snapshot"); + cluster.shutdown(); + } + + #[test] + fn test_basic_concurrent_snapshot() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + disable_auto_gen_compact_log(&mut cluster); + assert_eq!(cluster.cfg.tikv.raft_store.snap_handle_pool_size, 2); + assert_eq!(cluster.cfg.proxy_cfg.raft_store.snap_handle_pool_size, 2); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + + let region1 = cluster.get_region(b"k1"); + cluster.must_split(®ion1, b"k2"); + let r1 = cluster.get_region(b"k1").get_id(); + let r3 = cluster.get_region(b"k3").get_id(); + + fail::cfg("before_actually_pre_handle", "sleep(1000)").unwrap(); + tikv_util::info!("region k1 {} k3 {}", r1, r3); + let pending_count = cluster + .engines + .get(&2) + .unwrap() + .kv + .pending_applies_count + .clone(); + pd_client.add_peer(r1, new_peer(2, 2)); + pd_client.add_peer(r3, new_peer(2, 2)); + fail::cfg("apply_pending_snapshot", "return").unwrap(); + std::thread::sleep(std::time::Duration::from_millis(500)); + // Now, k1 and k3 are not handled, since pre-handle process is not finished. + // This is because `pending_applies_count` is not greater than `snap_handle_pool_size`, + // So there are no `handle_pending_applies` until `on_timeout`. + + fail::remove("apply_pending_snapshot"); + assert_eq!(pending_count.load(Ordering::SeqCst), 2); + std::thread::sleep(std::time::Duration::from_millis(600)); + check_key(&cluster, b"k1", b"v1", None, Some(true), Some(vec![1, 2])); + check_key(&cluster, b"k3", b"v3", None, Some(true), Some(vec![1, 2])); + // Now, k1 and k3 are handled. + assert_eq!(pending_count.load(Ordering::SeqCst), 0); + + fail::remove("before_actually_pre_handle"); + + cluster.shutdown(); } - fail::remove("on_empty_cmd_normal"); + #[test] + fn test_many_concurrent_snapshot() { + let c = 4; + let (mut cluster, pd_client) = new_split_region_cluster(c); + + for i in 0..c { + let k = format!("k{:0>4}", 2 * i + 1); + let region_id = cluster.get_region(k.as_bytes()).get_id(); + pd_client.must_add_peer(region_id, new_peer(2, 2)); + } + + for i in 0..c { + let k = format!("k{:0>4}", 2 * i + 1); + let v = format!("v{}", 2 * i + 1); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + Some(true), + Some(vec![2]), + ); + } - cluster.shutdown(); + cluster.shutdown(); + } } diff --git a/tests/proxy/proxy.rs b/tests/proxy/proxy.rs index 701b5e28bbc..751bb1df961 100644 --- a/tests/proxy/proxy.rs +++ b/tests/proxy/proxy.rs @@ -156,12 +156,9 @@ pub fn must_get_mem( value: Option<&[u8]>, ) { let mut last_res: Option<&Vec> = None; + let cf = new_mock_engine_store::ffi_interfaces::ColumnFamilyType::Default; for _ in 1..300 { - let res = engine_store_server.get_mem( - region_id, - new_mock_engine_store::ffi_interfaces::ColumnFamilyType::Default, - &key.to_vec(), - ); + let res = engine_store_server.get_mem(region_id, cf, &key.to_vec()); if let (Some(value), Some(last_res)) = (value, res) { assert_eq!(value, &last_res[..]); @@ -174,11 +171,12 @@ pub fn must_get_mem( } let s = std::str::from_utf8(key).unwrap_or(""); panic!( - "can't get mem value {:?} for key {}({}) in {}, actual {:?}", + "can't get mem value {:?} for key {}({}) in store {} cf {:?}, actual {:?}", value.map(tikv_util::escape), log_wrappers::hex_encode_upper(key), s, engine_store_server.id, + cf, last_res, ) } @@ -293,6 +291,15 @@ pub fn get_valid_compact_index(states: &HashMap) -> (u64, u64) { .unwrap() } +pub fn disable_auto_gen_compact_log(cluster: &mut Cluster) { + // Disable AUTO generated compact log. + // This will not totally disable, so we use some failpoints later. + cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); + cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); + cluster.cfg.raft_store.raft_log_gc_threshold = 1000; +} + #[test] fn test_kv_write() { let (mut cluster, pd_client) = new_mock_cluster(0, 3);