From 91c2de300089313b7bb26435ad42c2e74058893f Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 15 Sep 2022 19:04:56 +0800 Subject: [PATCH 1/6] adapt Signed-off-by: CalvinNeo --- Cargo.lock | 2 - components/test_raftstore/Cargo.toml | 3 - components/test_raftstore/src/cluster.rs | 167 ++--------------------- components/test_raftstore/src/node.rs | 3 - components/test_raftstore/src/pd.rs | 15 +- components/test_raftstore/src/server.rs | 34 +++-- components/test_raftstore/src/util.rs | 9 +- tests/failpoints/cases/test_bootstrap.rs | 3 - tests/failpoints/cases/test_normal.rs | 64 --------- 9 files changed, 39 insertions(+), 261 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 31aa0808a99..35e96749f04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5778,7 +5778,6 @@ dependencies = [ "encryption_export", "engine_rocks", "engine_rocks_helper", - "engine_store_ffi", "engine_test", "engine_traits", "fail", @@ -5790,7 +5789,6 @@ dependencies = [ "kvproto", "lazy_static", "log_wrappers", - "mock-engine-store", "pd_client", "protobuf", "raft", diff --git a/components/test_raftstore/Cargo.toml b/components/test_raftstore/Cargo.toml index 77829e6ab24..cd9df2e3c05 100644 --- a/components/test_raftstore/Cargo.toml +++ b/components/test_raftstore/Cargo.toml @@ -21,7 +21,6 @@ test-engines-rocksdb = [ test-engines-panic = [ "raftstore/test-engines-panic", ] -test-raftstore-proxy = ["raftstore/test-raftstore-proxy"] [dependencies] api_version = { path = "../api_version" } @@ -33,7 +32,6 @@ crossbeam = "0.8" encryption_export = { path = "../encryption/export", default-features = false } engine_rocks = { path = "../engine_rocks", default-features = false } engine_rocks_helper = { path = "../engine_rocks_helper" } -engine_store_ffi = { path = "../engine_store_ffi", default-features = false, features = ["testexport"] } engine_test = { path = "../engine_test", default-features = false } engine_traits = { path = "../engine_traits", default-features = false } fail = "0.5" @@ -45,7 +43,6 @@ keys = { path = "../keys", default-features = false } kvproto = { git = "https://github.com/pingcap/kvproto.git" } lazy_static = "1.3" log_wrappers = { path = "../log_wrappers" } -mock-engine-store = { path = "../../mock-engine-store", default-features = false } pd_client = { path = "../pd_client", default-features = false } protobuf = { version = "2.8", features = ["bytes"] } raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] } diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index c32b069f062..63c7e3023c3 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -4,10 +4,7 @@ use std::{ collections::hash_map::Entry as MapEntry, error::Error as StdError, result, - sync::{ - atomic::{AtomicBool, AtomicU8}, - mpsc, Arc, Mutex, RwLock, - }, + sync::{mpsc, Arc, Mutex, RwLock}, thread, time::Duration, }; @@ -16,7 +13,6 @@ use collections::{HashMap, HashSet}; use crossbeam::channel::TrySendError; use encryption_export::DataKeyManager; use engine_rocks::{raw::DB, Compat, RocksEngine, RocksSnapshot}; -use engine_store_ffi::TiFlashEngine; use engine_test::raft::RaftTestEngine; use engine_traits::{ CompactExt, Engines, Iterable, MiscExt, Mutable, Peekable, RaftEngineReadOnly, WriteBatch, @@ -35,7 +31,6 @@ use kvproto::{ RegionLocalState, }, }; -use mock_engine_store::EngineStoreServerWrap; use pd_client::{BucketStat, PdClient}; use raft::eraftpb::ConfChangeType; use raftstore::{ @@ -53,7 +48,6 @@ use raftstore::{ use tempfile::TempDir; use tikv::server::Result as ServerResult; use tikv_util::{ - sys::SysQuota, thread_group::GroupProperties, time::{Instant, ThreadReadId}, worker::LazyWorker, @@ -157,23 +151,7 @@ pub trait Simulator { } } -pub struct FFIHelperSet { - pub proxy: Box, - pub proxy_helper: Box, - pub engine_store_server: Box, - pub engine_store_server_wrap: Box, - pub engine_store_server_helper: Box, -} - -pub struct EngineHelperSet { - pub engine_store_server: Box, - pub engine_store_server_wrap: Box, - pub engine_store_server_helper: Box, -} - pub struct Cluster { - pub ffi_helper_set: HashMap, - pub global_engine_helper_set: Option, pub cfg: Config, leaders: HashMap, pub count: usize, @@ -191,7 +169,6 @@ pub struct Cluster { pub sst_workers_map: HashMap, pub sim: Arc>, pub pd_client: Arc, - pub proxy_compat: bool, } impl Cluster { @@ -205,8 +182,6 @@ impl Cluster { ) -> Cluster { // TODO: In the future, maybe it's better to test both case where `use_delete_range` is true and false Cluster { - ffi_helper_set: HashMap::default(), - global_engine_helper_set: None, cfg: Config { tikv: new_tikv_config_with_api_ver(id, api_version), prefer_mem: true, @@ -226,7 +201,6 @@ impl Cluster { pd_client, sst_workers: vec![], sst_workers_map: HashMap::default(), - proxy_compat: false, } } @@ -265,7 +239,6 @@ impl Cluster { create_test_engine(router, self.io_rate_limiter.clone(), &self.cfg); self.dbs.push(engines); self.key_managers.push(key_manager); - debug!("create_engine path is {}", dir.as_ref().to_str().unwrap()); self.paths.push(dir); self.sst_workers.push(sst_worker); } @@ -282,81 +255,7 @@ impl Cluster { } } - pub fn make_global_ffi_helper_set(&mut self) { - let mut engine_store_server = - Box::new(mock_engine_store::EngineStoreServer::new(99999, None)); - engine_store_server.proxy_compat = self.proxy_compat; - let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new( - &mut *engine_store_server, - None, - self as *const Cluster as isize, - )); - let engine_store_server_helper = - Box::new(mock_engine_store::gen_engine_store_server_helper( - std::pin::Pin::new(&*engine_store_server_wrap), - )); - - unsafe { - engine_store_ffi::init_engine_store_server_helper( - &*engine_store_server_helper as *const engine_store_ffi::EngineStoreServerHelper - as *mut u8, - ); - } - - self.global_engine_helper_set = Some(EngineHelperSet { - engine_store_server, - engine_store_server_wrap, - engine_store_server_helper, - }); - } - - pub fn make_ffi_helper_set( - &mut self, - id: u64, - engines: Engines, - key_mgr: &Option>, - router: &RaftRouter, - ) -> (FFIHelperSet, Config) { - // This TiFlash engine is a dummy TiFlash engine. - let engine = TiFlashEngine::from_rocks(engines.kv.clone()); - let proxy = Box::new(engine_store_ffi::RaftStoreProxy::new( - AtomicU8::new(engine_store_ffi::RaftProxyStatus::Idle as u8), - key_mgr.clone(), - Some(Box::new(engine_store_ffi::ReadIndexClient::new( - router.clone(), - SysQuota::cpu_cores_quota() as usize * 2, - ))), - std::sync::RwLock::new(Some(engine)), - )); - - let mut proxy_helper = Box::new(engine_store_ffi::RaftStoreProxyFFIHelper::new(&proxy)); - let mut engine_store_server = - Box::new(mock_engine_store::EngineStoreServer::new(id, Some(engines))); - engine_store_server.proxy_compat = self.proxy_compat; - let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new( - &mut *engine_store_server, - Some(&mut *proxy_helper), - self as *const Cluster as isize, - )); - let engine_store_server_helper = - Box::new(mock_engine_store::gen_engine_store_server_helper( - std::pin::Pin::new(&*engine_store_server_wrap), - )); - - let node_cfg = self.cfg.clone(); - let ffi_helper_set = FFIHelperSet { - proxy, - proxy_helper, - engine_store_server, - engine_store_server_wrap, - engine_store_server_helper, - }; - (ffi_helper_set, node_cfg) - } - pub fn start(&mut self) -> ServerResult<()> { - self.make_global_ffi_helper_set(); - // Try recover from last shutdown. let node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); for node_id in node_ids { @@ -375,22 +274,16 @@ impl Cluster { let props = GroupProperties::default(); tikv_util::thread_group::set_properties(Some(props.clone())); - let (mut ffi_helper_set, mut node_cfg) = - self.make_ffi_helper_set(0, self.dbs.last().unwrap().clone(), &key_mgr, &router); - let mut sim = self.sim.wl(); let node_id = sim.run_node( 0, - node_cfg, + self.cfg.clone(), engines.clone(), store_meta.clone(), key_mgr.clone(), router, system, )?; - debug!("start new node {}", node_id); - ffi_helper_set.engine_store_server.id = node_id; - self.ffi_helper_set.insert(node_id, ffi_helper_set); self.group_props.insert(node_id, props); self.engines.insert(node_id, engines); self.store_metas.insert(node_id, store_meta); @@ -441,6 +334,10 @@ impl Cluster { let engines = self.engines[&node_id].clone(); let key_mgr = self.key_managers_map[&node_id].clone(); let (router, system) = create_raft_batch_system(&self.cfg.raft_store); + let mut cfg = self.cfg.clone(); + if let Some(labels) = self.labels.get(&node_id) { + cfg.server.labels = labels.to_owned(); + } let store_meta = match self.store_metas.entry(node_id) { MapEntry::Occupied(o) => { let mut meta = o.get().lock().unwrap(); @@ -455,29 +352,10 @@ impl Cluster { self.group_props.insert(node_id, props.clone()); tikv_util::thread_group::set_properties(Some(props)); debug!("calling run node"; "node_id" => node_id); - - let mut node_cfg = if self.ffi_helper_set.contains_key(&node_id) { - let node_cfg = self.cfg.clone(); - node_cfg - } else { - let (ffi_helper_set, node_cfg) = self.make_ffi_helper_set( - node_id, - self.engines[&node_id].clone(), - &key_mgr, - &router, - ); - self.ffi_helper_set.insert(node_id, ffi_helper_set); - node_cfg - }; - - if let Some(labels) = self.labels.get(&node_id) { - node_cfg.server.labels = labels.to_owned(); - } - // FIXME: rocksdb event listeners may not work, because we change the router. - self.sim.wl().run_node( - node_id, node_cfg, engines, store_meta, key_mgr, router, system, - )?; + self.sim + .wl() + .run_node(node_id, cfg, engines, store_meta, key_mgr, router, system)?; debug!("node {} started", node_id); Ok(()) } @@ -1132,16 +1010,8 @@ impl Cluster { } pub fn must_put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) { - match self.batch_put(key, vec![new_put_cf_cmd(cf, key, value)]) { - Ok(resp) => { - if cfg!(feature = "test-raftstore-proxy") { - assert_eq!(resp.get_responses().len(), 1); - assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put); - } - } - Err(e) => { - panic!("has error: {:?}", e); - } + if let Err(e) = self.batch_put(key, vec![new_put_cf_cmd(cf, key, value)]) { + panic!("has error: {:?}", e); } } @@ -1549,7 +1419,6 @@ impl Cluster { debug!("asking split"; "region" => ?region, "key" => ?split_key); // In case ask split message is ignored, we should retry. if try_cnt % 50 == 0 { - debug!("must_split try once, count {}", try_cnt); self.reset_leader_of_region(region.get_id()); let key = split_key.to_vec(); let check = Box::new(move |write_resp: WriteResponse| { @@ -1908,17 +1777,3 @@ impl Drop for Cluster { self.shutdown(); } } - -pub fn gen_cluster(cluster_ptr: isize) -> Option<&'static Cluster> { - unsafe { - if cluster_ptr == 0 { - None - } else { - Some(&(*(cluster_ptr as *const Cluster))) - } - } -} - -pub unsafe fn init_cluster_ptr(cluster_ptr: &Cluster) -> isize { - cluster_ptr as *const Cluster as isize -} diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index 3c973b92f66..05afdcc7b77 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -229,7 +229,6 @@ impl Simulator for NodeCluster { let pd_worker = LazyWorker::new("test-pd-worker"); let simulate_trans = SimulateTransport::new(self.trans.clone()); - let mut raft_store = cfg.raft_store.clone(); raft_store .validate(cfg.coprocessor.region_split_size) @@ -262,7 +261,6 @@ impl Simulator for NodeCluster { .max_total_size(cfg.server.snap_max_total_size.0) .encryption_key_manager(key_manager) .max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0) - .enable_multi_snapshot_files(true) .build(tmp.path().to_str().unwrap()); (snap_mgr, Some(tmp)) } else { @@ -314,7 +312,6 @@ impl Simulator for NodeCluster { cm, CollectorRegHandle::new_for_test(), )?; - assert!( engines .kv diff --git a/components/test_raftstore/src/pd.rs b/components/test_raftstore/src/pd.rs index 0e612527f86..66823a29708 100644 --- a/components/test_raftstore/src/pd.rs +++ b/components/test_raftstore/src/pd.rs @@ -408,7 +408,7 @@ impl PdCluster { Ok(self.base_id.fetch_add(1, Ordering::Relaxed) as u64) } - pub fn put_store(&mut self, store: metapb::Store) -> Result<()> { + fn put_store(&mut self, store: metapb::Store) -> Result<()> { let store_id = store.get_id(); // There is a race between put_store and handle_region_heartbeat_response. If store id is // 0, it means it's a placeholder created by latter, we just need to update the meta. @@ -459,7 +459,7 @@ impl PdCluster { .map(|(_, region)| region.clone()) } - pub fn get_region_by_id(&self, region_id: u64) -> Result> { + fn get_region_by_id(&self, region_id: u64) -> Result> { Ok(self .region_id_keys .get(®ion_id) @@ -1371,8 +1371,7 @@ impl PdClient for TestPdClient { } fn alloc_id(&self) -> Result { - let result = self.cluster.rl().alloc_id(); - result + self.cluster.rl().alloc_id() } fn put_store(&self, store: metapb::Store) -> Result> { @@ -1584,13 +1583,11 @@ impl PdClient for TestPdClient { } let mut resp = pdpb::AskBatchSplitResponse::default(); - for c in 0..count { + for _ in 0..count { let mut id = pdpb::SplitId::default(); id.set_new_region_id(self.alloc_id().unwrap()); - - for peer in region.get_peers() { - let rid = self.alloc_id().unwrap(); - id.mut_new_peer_ids().push(rid); + for _ in region.get_peers() { + id.mut_new_peer_ids().push(self.alloc_id().unwrap()); } resp.mut_ids().push(id); } diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index cc75cb27c8e..2930035076d 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -2,10 +2,7 @@ use std::{ path::Path, - sync::{ - atomic::{AtomicBool, AtomicUsize}, - Arc, Mutex, RwLock, - }, + sync::{Arc, Mutex, RwLock}, thread, time::Duration, usize, @@ -370,26 +367,23 @@ impl ServerCluster { let (res_tag_factory, collector_reg_handle, rsmeter_cleanup) = self.init_resource_metering(&cfg.resource_metering); - let check_leader_runner = - CheckLeaderRunner::new(store_meta.clone(), coprocessor_host.clone()); + let check_leader_runner = CheckLeaderRunner::new(store_meta.clone(), coprocessor_host.clone()); let check_leader_scheduler = bg_worker.start("check-leader", check_leader_runner); - let lock_mgr = LockManager::new(&cfg.pessimistic_txn); + let mut lock_mgr = LockManager::new(&cfg.pessimistic_txn); let quota_limiter = Arc::new(QuotaLimiter::new( cfg.quota.foreground_cpu_time, cfg.quota.foreground_write_bandwidth, cfg.quota.foreground_read_bandwidth, cfg.quota.max_delay_duration, )); - - let dynamic_config = lock_mgr.get_storage_dynamic_configs(); let store = create_raft_storage::<_, _, _, F, _>( engine, &cfg.storage, storage_read_pool.handle(), - lock_mgr, + lock_mgr.clone(), concurrency_manager.clone(), - dynamic_config, + lock_mgr.get_storage_dynamic_configs(), Arc::new(FlowController::empty()), pd_sender, res_tag_factory.clone(), @@ -421,6 +415,9 @@ impl ServerCluster { Arc::clone(&importer), ); + // Create deadlock service. + let deadlock_service = lock_mgr.deadlock_service(); + // Create pd client, snapshot manager, server. let (resolver, state) = resolve::new_resolver(Arc::clone(&self.pd_client), &bg_worker, router.clone()); @@ -429,7 +426,6 @@ impl ServerCluster { .max_total_size(cfg.server.snap_max_total_size.0) .encryption_key_manager(key_manager) .max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0) - .enable_multi_snapshot_files(true) .build(tmp_str); self.snap_mgrs.insert(node_id, snap_mgr.clone()); let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); @@ -505,6 +501,7 @@ impl ServerCluster { .unwrap(); svr.register_service(create_import_sst(import_service.clone())); svr.register_service(create_debug(debug_service.clone())); + svr.register_service(create_deadlock(deadlock_service.clone())); if let Some(svcs) = self.pending_services.get(&node_id) { for fact in svcs { svr.register_service(fact()); @@ -532,8 +529,7 @@ impl ServerCluster { let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); // Register the role change observer of the lock manager. - // We have hacked lock manager - // lock_mgr.register_detector_role_change_observer(&mut coprocessor_host); + lock_mgr.register_detector_role_change_observer(&mut coprocessor_host); let pessimistic_txn_cfg = cfg.tikv.pessimistic_txn; @@ -566,6 +562,16 @@ impl ServerCluster { self.importers.insert(node_id, importer); self.health_services.insert(node_id, health_service); + lock_mgr + .start( + node.id(), + Arc::clone(&self.pd_client), + resolver, + Arc::clone(&security_mgr), + &pessimistic_txn_cfg, + ) + .unwrap(); + server.start(server_cfg, security_mgr).unwrap(); self.metas.insert( diff --git a/components/test_raftstore/src/util.rs b/components/test_raftstore/src/util.rs index 97c5e00cdf6..96082bc6fbb 100644 --- a/components/test_raftstore/src/util.rs +++ b/components/test_raftstore/src/util.rs @@ -67,11 +67,7 @@ pub fn must_get(engine: &Arc, cf: &str, key: &[u8], value: Option<&[u8]>) { } thread::sleep(Duration::from_millis(20)); } - debug!( - "last try to get {} cf {}", - log_wrappers::hex_encode_upper(key), - cf - ); + debug!("last try to get {}", log_wrappers::hex_encode_upper(key)); let res = engine.c().get_value_cf(cf, &keys::data_key(key)).unwrap(); if value.is_none() && res.is_none() || value.is_some() && res.is_some() && value.unwrap() == &*res.unwrap() @@ -79,9 +75,8 @@ pub fn must_get(engine: &Arc, cf: &str, key: &[u8], value: Option<&[u8]>) { return; } panic!( - "can't get value {:?} for key {:?} hex {}", + "can't get value {:?} for key {}", value.map(escape), - key, log_wrappers::hex_encode_upper(key) ) } diff --git a/tests/failpoints/cases/test_bootstrap.rs b/tests/failpoints/cases/test_bootstrap.rs index b819c6ae942..3923f4e77f2 100644 --- a/tests/failpoints/cases/test_bootstrap.rs +++ b/tests/failpoints/cases/test_bootstrap.rs @@ -10,9 +10,6 @@ fn test_bootstrap_half_way_failure(fp: &str) { let pd_client = Arc::new(TestPdClient::new(0, false)); let sim = Arc::new(RwLock::new(NodeCluster::new(pd_client.clone()))); let mut cluster = Cluster::new(0, 5, sim, pd_client, ApiVersion::V1); - unsafe { - test_raftstore::init_cluster_ptr(&cluster); - } // Try to start this node, return after persisted some keys. fail::cfg(fp, "return").unwrap(); diff --git a/tests/failpoints/cases/test_normal.rs b/tests/failpoints/cases/test_normal.rs index 9e1e6bdecd1..1885c1c8220 100644 --- a/tests/failpoints/cases/test_normal.rs +++ b/tests/failpoints/cases/test_normal.rs @@ -12,9 +12,6 @@ fn test_normal() { let pd_client = Arc::new(TestPdClient::new(0, false)); let sim = Arc::new(RwLock::new(NodeCluster::new(pd_client.clone()))); let mut cluster = Cluster::new(0, 3, sim, pd_client, ApiVersion::V1); - unsafe { - test_raftstore::init_cluster_ptr(&cluster); - } // Try to start this node, return after persisted some keys. let _ = cluster.start(); @@ -26,67 +23,6 @@ fn test_normal() { must_get_equal(&cluster.get_engine(*id), k, v); // must_get_equal(db, k, v); } - let region_id = cluster.get_region(k).get_id(); - unsafe { - for (_, ffi_set) in cluster.ffi_helper_set.iter_mut() { - let f = ffi_set.proxy_helper.fn_get_region_local_state.unwrap(); - let mut state = kvproto::raft_serverpb::RegionLocalState::default(); - let mut error_msg = mock_engine_store::RawCppStringPtrGuard::default(); - - assert_eq!( - f( - ffi_set.proxy_helper.proxy_ptr, - region_id, - &mut state as *mut _ as _, - error_msg.as_mut(), - ), - KVGetStatus::Ok - ); - assert!(state.has_region()); - assert_eq!(state.get_state(), kvproto::raft_serverpb::PeerState::Normal); - assert!(error_msg.as_ref().is_null()); - - let mut state = kvproto::raft_serverpb::RegionLocalState::default(); - assert_eq!( - f( - ffi_set.proxy_helper.proxy_ptr, - 0, // not exist - &mut state as *mut _ as _, - error_msg.as_mut(), - ), - KVGetStatus::NotFound - ); - assert!(!state.has_region()); - assert!(error_msg.as_ref().is_null()); - - ffi_set - .proxy - .get_value_cf("none_cf", "123".as_bytes(), |value| { - let msg = value.unwrap_err(); - assert_eq!(msg, "Storage Engine cf none_cf not found"); - }); - ffi_set - .proxy - .get_value_cf("raft", "123".as_bytes(), |value| { - let res = value.unwrap(); - assert!(res.is_none()); - }); - - ffi_set.proxy.set_kv_engine(None); - let res = ffi_set.proxy_helper.fn_get_region_local_state.unwrap()( - ffi_set.proxy_helper.proxy_ptr, - region_id, - &mut state as *mut _ as _, - error_msg.as_mut(), - ); - assert_eq!(res, KVGetStatus::Error); - assert!(!error_msg.as_ref().is_null()); - assert_eq!( - error_msg.as_str(), - "KV engine is not initialized".as_bytes() - ); - } - } cluster.shutdown(); } From 08b32e3dc1c4807fa02bf739d4772188a7c90a47 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 15 Sep 2022 19:07:49 +0800 Subject: [PATCH 2/6] adapt Signed-off-by: CalvinNeo --- components/test_raftstore/src/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 2930035076d..fa82e8026d5 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -367,7 +367,8 @@ impl ServerCluster { let (res_tag_factory, collector_reg_handle, rsmeter_cleanup) = self.init_resource_metering(&cfg.resource_metering); - let check_leader_runner = CheckLeaderRunner::new(store_meta.clone(), coprocessor_host.clone()); + let check_leader_runner = + CheckLeaderRunner::new(store_meta.clone(), coprocessor_host.clone()); let check_leader_scheduler = bg_worker.start("check-leader", check_leader_runner); let mut lock_mgr = LockManager::new(&cfg.pessimistic_txn); From a494ff1b982791931d8e974d8bb3263e0f37e5ae Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 15 Sep 2022 21:26:47 +0800 Subject: [PATCH 3/6] remove mock-engine-store Signed-off-by: CalvinNeo --- Cargo.lock | 21 - Cargo.toml | 1 - mock-engine-store/Cargo.toml | 32 - mock-engine-store/src/lib.rs | 1006 ------------------------- tests/Cargo.toml | 1 - tests/failpoints/cases/test_normal.rs | 2 - 6 files changed, 1063 deletions(-) delete mode 100644 mock-engine-store/Cargo.toml delete mode 100644 mock-engine-store/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 35e96749f04..0ad1b7c0a8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3117,26 +3117,6 @@ dependencies = [ "tempdir", ] -[[package]] -name = "mock-engine-store" -version = "0.0.1" -dependencies = [ - "engine_rocks", - "engine_store_ffi", - "engine_test", - "engine_traits", - "fail", - "keys", - "kvproto", - "protobuf", - "proxy_server", - "raftstore", - "server", - "slog", - "slog-global", - "tikv_util", -] - [[package]] name = "more-asserts" version = "0.2.1" @@ -5893,7 +5873,6 @@ dependencies = [ "kvproto", "libc 0.2.125", "log_wrappers", - "mock-engine-store", "more-asserts", "new-mock-engine-store", "online_config", diff --git a/Cargo.toml b/Cargo.toml index 8f0d8e81647..a3e60d1f8c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -277,7 +277,6 @@ members = [ "fuzz/fuzzer-honggfuzz", "fuzz/fuzzer-libfuzzer", "gen-proxy-ffi", - "mock-engine-store", "raftstore-proxy", "tests", ] diff --git a/mock-engine-store/Cargo.toml b/mock-engine-store/Cargo.toml deleted file mode 100644 index 09b54d91e03..00000000000 --- a/mock-engine-store/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -[package] -name = "mock-engine-store" -version = "0.0.1" -license = "Apache-2.0" -edition = "2018" -publish = false - -[lib] -name = "mock_engine_store" - -[features] -default = ["protobuf-codec"] -protobuf-codec = [ - "protobuf/bytes", - "kvproto/protobuf-codec", -] - -[dependencies] -engine_rocks = { path = "../components/engine_rocks", default-features = false } -engine_store_ffi = { path = "../components/engine_store_ffi", default-features = false } -engine_test = { path = "../components/engine_test", default-features = false } -engine_traits = { path = "../components/engine_traits", default-features = false } -fail = "0.5" -keys = { path = "../components/keys", default-features = false } -kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false } -protobuf = "2.8.0" -proxy_server = { path = "../components/proxy_server" } -raftstore = { path = "../components/raftstore", default-features = false } -server = { path = "../components/server" } -slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } -slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" } -tikv_util = { path = "../components/tikv_util", default-features = false } diff --git a/mock-engine-store/src/lib.rs b/mock-engine-store/src/lib.rs deleted file mode 100644 index 923aeafd505..00000000000 --- a/mock-engine-store/src/lib.rs +++ /dev/null @@ -1,1006 +0,0 @@ -// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. - -#![feature(slice_take)] - -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - pin::Pin, - sync::Mutex, - time::Duration, -}; - -use engine_rocks::RocksEngine; -use engine_store_ffi::RawCppPtr; -pub use engine_store_ffi::{ - interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper, - UnwrapExternCFunc, -}; -use engine_test::raft::RaftTestEngine; -use engine_traits::{Engines, Iterable, SyncMutable, CF_DEFAULT, CF_LOCK, CF_WRITE}; -use kvproto::{ - raft_cmdpb::{AdminCmdType, AdminRequest}, - raft_serverpb::{ - MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState, - }, -}; -use protobuf::Message; -use tikv_util::{debug, info, warn}; - -type RegionId = u64; -#[derive(Default, Clone)] -pub struct Region { - pub region: kvproto::metapb::Region, - // Which peer is me? - peer: kvproto::metapb::Peer, - // in-memory data - pub data: [BTreeMap, Vec>; 3], - // If we a key is deleted, it will immediately be removed from data, - // We will record the key in pending_delete, so we can delete it from disk when flushing. - pub pending_delete: [HashSet>; 3], - pub pending_write: [BTreeMap, Vec>; 3], - pub apply_state: kvproto::raft_serverpb::RaftApplyState, - pub applied_term: u64, -} - -impl Region { - fn set_applied(&mut self, index: u64, term: u64) { - self.apply_state.set_applied_index(index); - self.applied_term = term; - } - - fn new(meta: kvproto::metapb::Region) -> Self { - Region { - region: meta, - peer: Default::default(), - data: Default::default(), - pending_delete: Default::default(), - pending_write: Default::default(), - apply_state: Default::default(), - applied_term: 0, - } - } -} - -pub struct EngineStoreServer { - pub id: u64, - pub engines: Option>, - pub kvstore: HashMap>, - pub proxy_compat: bool, -} - -impl EngineStoreServer { - pub fn new(id: u64, engines: Option>) -> Self { - EngineStoreServer { - id, - engines, - kvstore: Default::default(), - proxy_compat: false, - } - } - - pub fn get_mem( - &self, - region_id: u64, - cf: ffi_interfaces::ColumnFamilyType, - key: &Vec, - ) -> Option<&Vec> { - match self.kvstore.get(®ion_id) { - Some(region) => { - let bmap = ®ion.data[cf as usize]; - bmap.get(key) - } - None => None, - } - } -} - -pub struct EngineStoreServerWrap { - pub engine_store_server: *mut EngineStoreServer, - pub maybe_proxy_helper: std::option::Option<*mut RaftStoreProxyFFIHelper>, - // Call `gen_cluster(cluster_ptr)`, and get which cluster this Server belong to. - pub cluster_ptr: isize, -} - -fn set_new_region_peer(new_region: &mut Region, store_id: u64) { - if let Some(peer) = new_region - .region - .get_peers() - .iter() - .find(|&peer| peer.get_store_id() == store_id) - { - new_region.peer = peer.clone(); - } else { - // This happens when region is not found. - } -} - -pub fn make_new_region( - maybe_from_region: Option, - maybe_store_id: Option, -) -> Region { - let mut region = Region { - region: maybe_from_region.unwrap_or(Default::default()), - ..Default::default() - }; - if let Some(store_id) = maybe_store_id { - set_new_region_peer(&mut region, store_id); - } - region - .apply_state - .mut_truncated_state() - .set_index(raftstore::store::RAFT_INIT_LOG_INDEX); - region - .apply_state - .mut_truncated_state() - .set_term(raftstore::store::RAFT_INIT_LOG_TERM); - region.set_applied( - raftstore::store::RAFT_INIT_LOG_INDEX, - raftstore::store::RAFT_INIT_LOG_TERM, - ); - region -} - -fn write_kv_in_mem(region: &mut Box, cf_index: usize, k: &[u8], v: &[u8]) { - let data = &mut region.data[cf_index]; - let pending_delete = &mut region.pending_delete[cf_index]; - let pending_write = &mut region.pending_write[cf_index]; - pending_delete.remove(k); - data.insert(k.to_vec(), v.to_vec()); - pending_write.insert(k.to_vec(), v.to_vec()); -} - -fn delete_kv_in_mem(region: &mut Box, cf_index: usize, k: &[u8]) { - let data = &mut region.data[cf_index]; - let pending_delete = &mut region.pending_delete[cf_index]; - pending_delete.insert(k.to_vec()); - data.remove(k); -} - -unsafe fn load_from_db(store: &mut EngineStoreServer, region: &mut Box) { - let kv = &mut store.engines.as_mut().unwrap().kv; - for cf in 0..3 { - let cf_name = cf_to_name(cf.into()); - region.data[cf].clear(); - region.pending_delete[cf].clear(); - region.pending_write[cf].clear(); - let start = region.region.get_start_key().to_owned(); - let end = region.region.get_end_key().to_owned(); - kv.scan_cf(cf_name, &start, &end, false, |k, v| { - region.data[cf].insert(k.to_vec(), v.to_vec()); - Ok(true) - }); - } -} - -unsafe fn write_to_db_data(store: &mut EngineStoreServer, region: &mut Box) { - info!("mock flush to engine"; - "region" => ?region.region, - "store_id" => store.id, - ); - let kv = &mut store.engines.as_mut().unwrap().kv; - for cf in 0..3 { - let pending_write = std::mem::take(region.pending_write.as_mut().get_mut(cf).unwrap()); - let mut pending_remove = - std::mem::take(region.pending_delete.as_mut().get_mut(cf).unwrap()); - for (k, v) in pending_write.into_iter() { - let tikv_key = keys::data_key(k.as_slice()); - let cf_name = cf_to_name(cf.into()); - if !pending_remove.contains(&k) { - kv.put_cf(cf_name, &tikv_key.as_slice(), &v); - } else { - pending_remove.remove(&k); - } - } - let cf_name = cf_to_name(cf.into()); - for k in pending_remove.into_iter() { - let tikv_key = keys::data_key(k.as_slice()); - kv.delete_cf(cf_name, &tikv_key); - } - } -} - -impl EngineStoreServerWrap { - pub fn new( - engine_store_server: *mut EngineStoreServer, - maybe_proxy_helper: std::option::Option<*mut RaftStoreProxyFFIHelper>, - cluster_ptr: isize, - ) -> Self { - Self { - engine_store_server, - maybe_proxy_helper, - cluster_ptr, - } - } - - unsafe fn handle_admin_raft_cmd( - &mut self, - req: &kvproto::raft_cmdpb::AdminRequest, - resp: &kvproto::raft_cmdpb::AdminResponse, - header: ffi_interfaces::RaftCmdHeader, - ) -> ffi_interfaces::EngineStoreApplyRes { - let region_id = header.region_id; - let node_id = (*self.engine_store_server).id; - info!("handle_admin_raft_cmd"; - "request"=>?req, "response"=>?resp, "index"=>header.index, "region-id"=>header.region_id); - let do_handle_admin_raft_cmd = - move |region: &mut Box, engine_store_server: &mut EngineStoreServer| { - if region.apply_state.get_applied_index() >= header.index { - return ffi_interfaces::EngineStoreApplyRes::Persist; - } - match req.get_cmd_type() { - AdminCmdType::ChangePeer | AdminCmdType::ChangePeerV2 => { - let new_region_meta = resp.get_change_peer().get_region(); - let old_peer_id = { - let old_region = - engine_store_server.kvstore.get_mut(®ion_id).unwrap(); - old_region.region = new_region_meta.clone(); - region.set_applied(header.index, header.term); - old_region.peer.get_store_id() - }; - - let mut do_remove = true; - if old_peer_id != 0 { - for peer in new_region_meta.get_peers().iter() { - if peer.get_store_id() == old_peer_id { - // Should not remove region - do_remove = false; - } - } - } else { - // If old_peer_id is 0, seems old_region.peer is not set, just neglect for convenience. - do_remove = false; - } - if do_remove { - let removed = engine_store_server.kvstore.remove(®ion_id); - // We need to also remove apply state, thus we need to know peer_id - debug!( - "Remove region {:?} peer_id {} at node {}, for new meta {:?}", - removed.unwrap().region, - old_peer_id, - node_id, - new_region_meta - ); - } - } - AdminCmdType::BatchSplit => { - let regions = resp.get_splits().regions.as_ref(); - - for i in 0..regions.len() { - let region_meta = regions.get(i).unwrap(); - if region_meta.id == region_id { - // This is the derived region - debug!( - "region {} is derived by split at peer {} with meta {:?}", - region_meta.id, node_id, region_meta - ); - assert!(engine_store_server.kvstore.contains_key(®ion_meta.id)); - engine_store_server - .kvstore - .get_mut(®ion_meta.id) - .unwrap() - .region = region_meta.clone(); - } else { - // Should split data into new region - debug!( - "new region {} generated by split at peer {} with meta {:?}", - region_meta.id, node_id, region_meta - ); - let mut new_region = - make_new_region(Some(region_meta.clone()), Some(node_id)); - - // No need to split data because all KV are stored in the same RocksDB. - // TODO But we still need to clean all in-memory data. - // We can't assert `region_meta.id` is brand new here - engine_store_server - .kvstore - .insert(region_meta.id, Box::new(new_region)); - } - } - } - AdminCmdType::PrepareMerge => { - let tikv_region = resp.get_split().get_left(); - - let target = req.prepare_merge.as_ref().unwrap().target.as_ref(); - let region_meta = &mut (engine_store_server - .kvstore - .get_mut(®ion_id) - .unwrap() - .region); - - // Increase self region conf version and version - let region_epoch = region_meta.region_epoch.as_mut().unwrap(); - let new_version = region_epoch.version + 1; - region_epoch.set_version(new_version); - assert_eq!(tikv_region.get_region_epoch().get_version(), new_version); - let conf_version = region_epoch.conf_ver + 1; - region_epoch.set_conf_ver(conf_version); - assert_eq!(tikv_region.get_region_epoch().get_conf_ver(), conf_version); - - { - let region = engine_store_server.kvstore.get_mut(®ion_id).unwrap(); - region.set_applied(header.index, header.term); - } - // We don't handle MergeState and PeerState here - } - AdminCmdType::CommitMerge => { - { - let tikv_target_region_meta = resp.get_split().get_left(); - - let target_region = - &mut (engine_store_server.kvstore.get_mut(®ion_id).unwrap()); - let target_region_meta = &mut target_region.region; - let target_version = - target_region_meta.get_region_epoch().get_version(); - let source_region = req.get_commit_merge().get_source(); - let source_version = source_region.get_region_epoch().get_version(); - - let new_version = std::cmp::max(source_version, target_version) + 1; - target_region_meta - .mut_region_epoch() - .set_version(new_version); - assert_eq!( - target_region_meta.get_region_epoch().get_version(), - new_version - ); - - // No need to merge data - let source_at_left = if source_region.get_start_key().is_empty() { - true - } else if target_region_meta.get_start_key().is_empty() { - false - } else { - source_region - .get_end_key() - .cmp(target_region_meta.get_start_key()) - == std::cmp::Ordering::Equal - }; - - if source_at_left { - target_region_meta - .set_start_key(source_region.get_start_key().to_vec()); - assert_eq!( - tikv_target_region_meta.get_start_key(), - target_region_meta.get_start_key() - ); - } else { - target_region_meta - .set_end_key(source_region.get_end_key().to_vec()); - assert_eq!( - tikv_target_region_meta.get_end_key(), - target_region_meta.get_end_key() - ); - } - target_region.set_applied(header.index, header.term); - } - let to_remove = req.get_commit_merge().get_source().get_id(); - engine_store_server.kvstore.remove(&to_remove); - } - AdminCmdType::RollbackMerge => { - let region = engine_store_server.kvstore.get_mut(®ion_id).unwrap(); - let region_meta = &mut region.region; - let new_version = region_meta.get_region_epoch().get_version() + 1; - let region_epoch = region_meta.region_epoch.as_mut().unwrap(); - region_epoch.set_version(new_version); - - region.set_applied(header.index, header.term); - } - AdminCmdType::CompactLog => { - // We will modify truncated_state when returns Persist. - region.set_applied(header.index, header.term); - } - _ => { - region.set_applied(header.index, header.term); - } - } - // Do persist or not - let res = match req.get_cmd_type() { - AdminCmdType::CompactLog => { - fail::fail_point!("no_persist_compact_log", |_| { - ffi_interfaces::EngineStoreApplyRes::None - }); - ffi_interfaces::EngineStoreApplyRes::Persist - } - _ => ffi_interfaces::EngineStoreApplyRes::Persist, - }; - if req.get_cmd_type() == AdminCmdType::CompactLog - && res == ffi_interfaces::EngineStoreApplyRes::Persist - { - let region = engine_store_server.kvstore.get_mut(®ion_id).unwrap(); - let state = &mut region.apply_state; - let compact_index = req.get_compact_log().get_compact_index(); - let compact_term = req.get_compact_log().get_compact_term(); - state.mut_truncated_state().set_index(compact_index); - state.mut_truncated_state().set_term(compact_term); - } - res - }; - - let res = match (*self.engine_store_server).kvstore.entry(region_id) { - std::collections::hash_map::Entry::Occupied(mut o) => { - do_handle_admin_raft_cmd(o.get_mut(), &mut (*self.engine_store_server)) - } - std::collections::hash_map::Entry::Vacant(v) => { - // Currently in tests, we don't handle commands like BatchSplit, - // and sometimes we don't bootstrap region 1, - // so it is normal if we find no region. - warn!( - "region {} not found when admin, create for {}", - region_id, node_id - ); - let new_region = v.insert(Default::default()); - assert!((*self.engine_store_server).kvstore.contains_key(®ion_id)); - do_handle_admin_raft_cmd(new_region, &mut (*self.engine_store_server)) - } - }; - - let region = match (*self.engine_store_server).kvstore.get_mut(®ion_id) { - Some(r) => Some(r), - None => { - warn!( - "still can't find region {} for {}, may be remove due to confchange", - region_id, node_id - ); - None - } - }; - match res { - ffi_interfaces::EngineStoreApplyRes::Persist => { - if let Some(region) = region { - write_to_db_data(&mut (*self.engine_store_server), region); - } - } - _ => (), - }; - res - } - - unsafe fn handle_write_raft_cmd( - &mut self, - cmds: ffi_interfaces::WriteCmdsView, - header: ffi_interfaces::RaftCmdHeader, - ) -> ffi_interfaces::EngineStoreApplyRes { - let region_id = header.region_id; - let server = &mut (*self.engine_store_server); - let kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv; - let proxy_compat = server.proxy_compat; - - let mut do_handle_write_raft_cmd = move |region: &mut Box| { - if region.apply_state.get_applied_index() >= header.index { - return ffi_interfaces::EngineStoreApplyRes::None; - } - for i in 0..cmds.len { - let key = &*cmds.keys.add(i as _); - let val = &*cmds.vals.add(i as _); - let k = &key.to_slice(); - let v = &val.to_slice(); - let tp = &*cmds.cmd_types.add(i as _); - let cf = &*cmds.cmd_cf.add(i as _); - let cf_index = (*cf) as u8; - debug!( - "handle_write_raft_cmd"; - "k" => ?&k[..std::cmp::min(4usize, k.len())], - "v" => ?&v[..std::cmp::min(4usize, v.len())], - "region_id" => region_id, - "node_id" => server.id, - "header" => ?header, - ); - let data = &mut region.data[cf_index as usize]; - match tp { - engine_store_ffi::WriteCmdType::Put => { - write_kv_in_mem(region, cf_index as usize, k, v); - } - engine_store_ffi::WriteCmdType::Del => { - delete_kv_in_mem(region, cf_index as usize, k); - } - } - } - // Advance apply index, but do not persist - region.set_applied(header.index, header.term); - if !proxy_compat { - write_to_db_data(server, region); - } - ffi_interfaces::EngineStoreApplyRes::None - }; - - match (*self.engine_store_server).kvstore.entry(region_id) { - std::collections::hash_map::Entry::Occupied(mut o) => { - do_handle_write_raft_cmd(o.get_mut()) - } - std::collections::hash_map::Entry::Vacant(v) => { - warn!("region {} not found when write", region_id); - do_handle_write_raft_cmd(v.insert(Default::default())) - } - } - } -} - -unsafe extern "C" fn ffi_set_pb_msg_by_bytes( - type_: ffi_interfaces::MsgPBType, - ptr: ffi_interfaces::RawVoidPtr, - buff: ffi_interfaces::BaseBuffView, -) { - match type_ { - ffi_interfaces::MsgPBType::ReadIndexResponse => { - let v = &mut *(ptr as *mut kvproto::kvrpcpb::ReadIndexResponse); - v.merge_from_bytes(buff.to_slice()).unwrap(); - } - ffi_interfaces::MsgPBType::ServerInfoResponse => { - let v = &mut *(ptr as *mut kvproto::diagnosticspb::ServerInfoResponse); - v.merge_from_bytes(buff.to_slice()).unwrap(); - } - ffi_interfaces::MsgPBType::RegionLocalState => { - let v = &mut *(ptr as *mut kvproto::raft_serverpb::RegionLocalState); - v.merge_from_bytes(buff.to_slice()).unwrap(); - } - } -} - -pub fn gen_engine_store_server_helper( - wrap: Pin<&EngineStoreServerWrap>, -) -> EngineStoreServerHelper { - EngineStoreServerHelper { - magic_number: ffi_interfaces::RAFT_STORE_PROXY_MAGIC_NUMBER, - version: ffi_interfaces::RAFT_STORE_PROXY_VERSION, - inner: &(*wrap) as *const EngineStoreServerWrap as *mut _, - fn_gen_cpp_string: Some(ffi_gen_cpp_string), - fn_handle_write_raft_cmd: Some(ffi_handle_write_raft_cmd), - fn_handle_admin_raft_cmd: Some(ffi_handle_admin_raft_cmd), - fn_need_flush_data: Some(ffi_need_flush_data), - fn_try_flush_data: Some(ffi_try_flush_data), - fn_atomic_update_proxy: Some(ffi_atomic_update_proxy), - fn_handle_destroy: Some(ffi_handle_destroy), - fn_handle_ingest_sst: Some(ffi_handle_ingest_sst), - fn_handle_compute_store_stats: Some(ffi_handle_compute_store_stats), - fn_handle_get_engine_store_server_status: None, - fn_pre_handle_snapshot: Some(ffi_pre_handle_snapshot), - fn_apply_pre_handled_snapshot: Some(ffi_apply_pre_handled_snapshot), - fn_handle_http_request: None, - fn_check_http_uri_available: None, - fn_gc_raw_cpp_ptr: Some(ffi_gc_raw_cpp_ptr), - fn_get_config: None, - fn_set_store: None, - fn_set_pb_msg_by_bytes: Some(ffi_set_pb_msg_by_bytes), - fn_handle_safe_ts_update: None, - } -} - -unsafe fn into_engine_store_server_wrap( - arg1: *const ffi_interfaces::EngineStoreServerWrap, -) -> &'static mut EngineStoreServerWrap { - &mut *(arg1 as *mut EngineStoreServerWrap) -} - -unsafe extern "C" fn ffi_handle_admin_raft_cmd( - arg1: *const ffi_interfaces::EngineStoreServerWrap, - arg2: ffi_interfaces::BaseBuffView, - arg3: ffi_interfaces::BaseBuffView, - arg4: ffi_interfaces::RaftCmdHeader, -) -> ffi_interfaces::EngineStoreApplyRes { - let store = into_engine_store_server_wrap(arg1); - let mut req = kvproto::raft_cmdpb::AdminRequest::default(); - let mut resp = kvproto::raft_cmdpb::AdminResponse::default(); - req.merge_from_bytes(arg2.to_slice()).unwrap(); - resp.merge_from_bytes(arg3.to_slice()).unwrap(); - store.handle_admin_raft_cmd(&req, &resp, arg4) -} - -unsafe extern "C" fn ffi_handle_write_raft_cmd( - arg1: *const ffi_interfaces::EngineStoreServerWrap, - arg2: ffi_interfaces::WriteCmdsView, - arg3: ffi_interfaces::RaftCmdHeader, -) -> ffi_interfaces::EngineStoreApplyRes { - let store = into_engine_store_server_wrap(arg1); - store.handle_write_raft_cmd(arg2, arg3) -} - -enum RawCppPtrTypeImpl { - None = 0, - String, - PreHandledSnapshotWithBlock, - WakerNotifier, -} - -impl From for RawCppPtrTypeImpl { - fn from(o: ffi_interfaces::RawCppPtrType) -> Self { - match o { - 0 => RawCppPtrTypeImpl::None, - 1 => RawCppPtrTypeImpl::String, - 2 => RawCppPtrTypeImpl::PreHandledSnapshotWithBlock, - 3 => RawCppPtrTypeImpl::WakerNotifier, - _ => unreachable!(), - } - } -} - -impl Into for RawCppPtrTypeImpl { - fn into(self) -> ffi_interfaces::RawCppPtrType { - match self { - RawCppPtrTypeImpl::None => 0, - RawCppPtrTypeImpl::String => 1, - RawCppPtrTypeImpl::PreHandledSnapshotWithBlock => 2, - RawCppPtrTypeImpl::WakerNotifier => 3, - } - } -} - -extern "C" fn ffi_need_flush_data( - _arg1: *mut ffi_interfaces::EngineStoreServerWrap, - _region_id: u64, -) -> u8 { - fail::fail_point!("need_flush_data", |e| e.unwrap().parse::().unwrap()); - true as u8 -} - -extern "C" fn ffi_try_flush_data( - _arg1: *mut ffi_interfaces::EngineStoreServerWrap, - _region_id: u64, - _try_until_succeed: u8, - _index: u64, - _term: u64, -) -> u8 { - fail::fail_point!("try_flush_data", |e| e.unwrap().parse::().unwrap()); - true as u8 -} - -extern "C" fn ffi_gen_cpp_string(s: ffi_interfaces::BaseBuffView) -> ffi_interfaces::RawCppPtr { - let str = Box::new(Vec::from(s.to_slice())); - let ptr = Box::into_raw(str); - ffi_interfaces::RawCppPtr { - ptr: ptr as *mut _, - type_: RawCppPtrTypeImpl::String.into(), - } -} - -pub struct RawCppStringPtrGuard(ffi_interfaces::RawCppStringPtr); - -impl Default for RawCppStringPtrGuard { - fn default() -> Self { - Self(std::ptr::null_mut()) - } -} - -impl std::convert::AsRef for RawCppStringPtrGuard { - fn as_ref(&self) -> &ffi_interfaces::RawCppStringPtr { - &self.0 - } -} -impl std::convert::AsMut for RawCppStringPtrGuard { - fn as_mut(&mut self) -> &mut ffi_interfaces::RawCppStringPtr { - &mut self.0 - } -} - -impl Drop for RawCppStringPtrGuard { - fn drop(&mut self) { - ffi_interfaces::RawCppPtr { - ptr: self.0 as *mut _, - type_: RawCppPtrTypeImpl::String.into(), - }; - } -} - -impl RawCppStringPtrGuard { - pub fn as_str(&self) -> &[u8] { - let s = self.0 as *mut Vec; - unsafe { &*s } - } -} - -pub struct ProxyNotifier { - cv: std::sync::Condvar, - mutex: Mutex<()>, - // multi notifiers single receiver model. use another flag to avoid waiting until timeout. - flag: std::sync::atomic::AtomicBool, -} - -impl ProxyNotifier { - pub fn blocked_wait_for(&self, timeout: Duration) { - // if flag from false to false, wait for notification. - // if flag from true to false, do nothing. - if !self.flag.swap(false, std::sync::atomic::Ordering::AcqRel) { - { - let lock = self.mutex.lock().unwrap(); - if !self.flag.load(std::sync::atomic::Ordering::Acquire) { - self.cv.wait_timeout(lock, timeout); - } - } - self.flag.store(false, std::sync::atomic::Ordering::Release); - } - } - - pub fn wake(&self) { - // if flag from false -> true, then wake up. - // if flag from true -> true, do nothing. - if !self.flag.swap(true, std::sync::atomic::Ordering::AcqRel) { - let _ = self.mutex.lock().unwrap(); - self.cv.notify_one(); - } - } - - pub fn new_raw() -> RawCppPtr { - let notifier = Box::new(Self { - cv: Default::default(), - mutex: Mutex::new(()), - flag: std::sync::atomic::AtomicBool::new(false), - }); - - RawCppPtr { - ptr: Box::into_raw(notifier) as _, - type_: RawCppPtrTypeImpl::WakerNotifier.into(), - } - } -} - -extern "C" fn ffi_gc_raw_cpp_ptr( - ptr: ffi_interfaces::RawVoidPtr, - tp: ffi_interfaces::RawCppPtrType, -) { - match RawCppPtrTypeImpl::from(tp) { - RawCppPtrTypeImpl::None => {} - RawCppPtrTypeImpl::String => unsafe { - Box::>::from_raw(ptr as *mut _); - }, - RawCppPtrTypeImpl::PreHandledSnapshotWithBlock => unsafe { - Box::::from_raw(ptr as *mut _); - }, - RawCppPtrTypeImpl::WakerNotifier => unsafe { - Box::from_raw(ptr as *mut ProxyNotifier); - }, - } -} - -unsafe extern "C" fn ffi_atomic_update_proxy( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - arg2: *mut ffi_interfaces::RaftStoreProxyFFIHelper, -) { - let store = into_engine_store_server_wrap(arg1); - store.maybe_proxy_helper = Some(&mut *(arg2 as *mut RaftStoreProxyFFIHelper)); -} - -unsafe extern "C" fn ffi_handle_destroy( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - arg2: u64, -) { - let store = into_engine_store_server_wrap(arg1); - (*store.engine_store_server).kvstore.remove(&arg2); -} - -type MockRaftProxyHelper = RaftStoreProxyFFIHelper; - -pub struct SSTReader<'a> { - proxy_helper: &'a MockRaftProxyHelper, - inner: ffi_interfaces::SSTReaderPtr, - type_: ffi_interfaces::ColumnFamilyType, -} - -impl<'a> Drop for SSTReader<'a> { - fn drop(&mut self) { - unsafe { - (self.proxy_helper.sst_reader_interfaces.fn_gc.into_inner())( - self.inner.clone(), - self.type_, - ); - } - } -} - -impl<'a> SSTReader<'a> { - pub unsafe fn new( - proxy_helper: &'a MockRaftProxyHelper, - view: &'a ffi_interfaces::SSTView, - ) -> Self { - SSTReader { - proxy_helper, - inner: (proxy_helper - .sst_reader_interfaces - .fn_get_sst_reader - .into_inner())(view.clone(), proxy_helper.proxy_ptr.clone()), - type_: view.type_, - } - } - - pub unsafe fn remained(&mut self) -> bool { - (self - .proxy_helper - .sst_reader_interfaces - .fn_remained - .into_inner())(self.inner.clone(), self.type_) - != 0 - } - - pub unsafe fn key(&mut self) -> ffi_interfaces::BaseBuffView { - (self.proxy_helper.sst_reader_interfaces.fn_key.into_inner())( - self.inner.clone(), - self.type_, - ) - } - - pub unsafe fn value(&mut self) -> ffi_interfaces::BaseBuffView { - (self - .proxy_helper - .sst_reader_interfaces - .fn_value - .into_inner())(self.inner.clone(), self.type_) - } - - pub unsafe fn next(&mut self) { - (self.proxy_helper.sst_reader_interfaces.fn_next.into_inner())( - self.inner.clone(), - self.type_, - ) - } -} - -struct PrehandledSnapshot { - pub region: std::option::Option, -} - -unsafe extern "C" fn ffi_pre_handle_snapshot( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - region_buff: ffi_interfaces::BaseBuffView, - peer_id: u64, - snaps: ffi_interfaces::SSTViewVec, - index: u64, - term: u64, -) -> ffi_interfaces::RawCppPtr { - let store = into_engine_store_server_wrap(arg1); - let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap()); - let kvstore = &mut (*store.engine_store_server).kvstore; - - let mut req = kvproto::metapb::Region::default(); - assert_ne!(region_buff.data, std::ptr::null()); - assert_ne!(region_buff.len, 0); - req.merge_from_bytes(region_buff.to_slice()).unwrap(); - - let req_id = req.id; - - let mut region = Box::new(Region::new(req)); - debug!( - "pre handle snaps with len {} peer_id {} region {:?}", - snaps.len, peer_id, region.region - ); - for i in 0..snaps.len { - let mut snapshot = snaps.views.add(i as usize); - let view = &*(snapshot as *mut ffi_interfaces::SSTView); - let mut sst_reader = SSTReader::new(proxy_helper, view); - - while sst_reader.remained() { - let key = sst_reader.key(); - let value = sst_reader.value(); - - let cf_index = (*snapshot).type_ as u8; - write_kv_in_mem( - &mut region, - cf_index as usize, - key.to_slice(), - value.to_slice(), - ); - - sst_reader.next(); - } - } - { - region.set_applied(index, term); - region.apply_state.mut_truncated_state().set_index(index); - region.apply_state.mut_truncated_state().set_term(term); - } - ffi_interfaces::RawCppPtr { - ptr: Box::into_raw(Box::new(PrehandledSnapshot { - region: Some(*region), - })) as *const Region as ffi_interfaces::RawVoidPtr, - type_: RawCppPtrTypeImpl::PreHandledSnapshotWithBlock.into(), - } -} - -pub fn cf_to_name(cf: ffi_interfaces::ColumnFamilyType) -> &'static str { - match cf { - ffi_interfaces::ColumnFamilyType::Lock => CF_LOCK, - ffi_interfaces::ColumnFamilyType::Write => CF_WRITE, - ffi_interfaces::ColumnFamilyType::Default => CF_DEFAULT, - _ => unreachable!(), - } -} - -unsafe extern "C" fn ffi_apply_pre_handled_snapshot( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - arg2: ffi_interfaces::RawVoidPtr, - arg3: ffi_interfaces::RawCppPtrType, -) { - let store = into_engine_store_server_wrap(arg1); - let req = &mut *(arg2 as *mut PrehandledSnapshot); - let node_id = (*store.engine_store_server).id; - - let req_id = req.region.as_ref().unwrap().region.id; - - &(*store.engine_store_server) - .kvstore - .insert(req_id, Box::new(req.region.take().unwrap())); - - let region = (*store.engine_store_server) - .kvstore - .get_mut(&req_id) - .unwrap(); - - debug!( - "apply snaps peer_id {} region {:?}", - node_id, ®ion.region - ); - write_to_db_data(&mut (*store.engine_store_server), region); -} - -unsafe extern "C" fn ffi_handle_ingest_sst( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - snaps: ffi_interfaces::SSTViewVec, - header: ffi_interfaces::RaftCmdHeader, -) -> ffi_interfaces::EngineStoreApplyRes { - let store = into_engine_store_server_wrap(arg1); - let node_id = (*store.engine_store_server).id; - let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap()); - debug!("ingest sst with len {}", snaps.len); - - let region_id = header.region_id; - let kvstore = &mut (*store.engine_store_server).kvstore; - let kv = &mut (*store.engine_store_server).engines.as_mut().unwrap().kv; - - match kvstore.entry(region_id) { - std::collections::hash_map::Entry::Occupied(mut o) => {} - std::collections::hash_map::Entry::Vacant(v) => { - // When we remove hacked code in handle_raft_entry_normal during migration, - // some tests in handle_raft_entry_normal may fail, since it can observe a empty cmd, - // thus creating region. - warn!( - "region {} not found when ingest, create for {}", - region_id, node_id - ); - let new_region = v.insert(Default::default()); - } - } - let region = kvstore.get_mut(®ion_id).unwrap(); - - let index = header.index; - let term = header.term; - - for i in 0..snaps.len { - let mut snapshot = snaps.views.add(i as usize); - let path = std::str::from_utf8_unchecked((*snapshot).path.to_slice()); - let mut sst_reader = - SSTReader::new(proxy_helper, &*(snapshot as *mut ffi_interfaces::SSTView)); - - while sst_reader.remained() { - let key = sst_reader.key(); - let value = sst_reader.value(); - - let cf_index = (*snapshot).type_ as usize; - let cf_name = cf_to_name((*snapshot).type_); - let tikv_key = keys::data_key(key.to_slice()); - write_kv_in_mem(region, cf_index, key.to_slice(), value.to_slice()); - sst_reader.next(); - } - } - - { - region.set_applied(header.index, header.term); - region.apply_state.mut_truncated_state().set_index(index); - region.apply_state.mut_truncated_state().set_term(term); - } - - write_to_db_data(&mut (*store.engine_store_server), region); - ffi_interfaces::EngineStoreApplyRes::Persist -} - -unsafe extern "C" fn ffi_handle_compute_store_stats( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, -) -> ffi_interfaces::StoreStats { - ffi_interfaces::StoreStats { - fs_stats: ffi_interfaces::FsStats { - used_size: 0, - avail_size: 0, - capacity_size: 0, - ok: 1, - }, - engine_bytes_written: 0, - engine_keys_written: 0, - engine_bytes_read: 0, - engine_keys_read: 0, - } -} diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 563f93bd099..4cea4a723a2 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -71,7 +71,6 @@ grpcio-health = { version = "0.10", default-features = false } kvproto = { git = "https://github.com/pingcap/kvproto.git" } libc = "0.2" log_wrappers = { path = "../components/log_wrappers" } -mock-engine-store = { path = "../mock-engine-store", default-features = false } more-asserts = "0.2" new-mock-engine-store = { path = "../new-mock-engine-store", default-features = false } online_config = { path = "../components/online_config", default-features = false } diff --git a/tests/failpoints/cases/test_normal.rs b/tests/failpoints/cases/test_normal.rs index 1885c1c8220..94eefdaef63 100644 --- a/tests/failpoints/cases/test_normal.rs +++ b/tests/failpoints/cases/test_normal.rs @@ -2,9 +2,7 @@ use std::sync::{Arc, RwLock}; -use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI}; use kvproto::kvrpcpb::ApiVersion; -use mock_engine_store; use test_raftstore::*; #[test] From 4292ec4f6b9eb1d6281b7c1cf892f2f691bb1b7d Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 15 Sep 2022 23:21:02 +0800 Subject: [PATCH 4/6] f Signed-off-by: CalvinNeo --- new-mock-engine-store/src/lib.rs | 1 + tests/proxy/normal.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index ac718ff6ec0..790f11488d6 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -1159,3 +1159,4 @@ unsafe extern "C" fn ffi_handle_compute_store_stats( engine_keys_read: 0, } } + diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 9893b9abaf9..2d32f77680a 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -158,7 +158,7 @@ mod region { &mut |_id: u64, _, ffi_set: &mut FFIHelperSet| { let f = ffi_set.proxy_helper.fn_get_region_local_state.unwrap(); let mut state = kvproto::raft_serverpb::RegionLocalState::default(); - let mut error_msg = mock_engine_store::RawCppStringPtrGuard::default(); + let mut error_msg = new_mock_engine_store::RawCppStringPtrGuard::default(); assert_eq!( f( From 0a24170b981f97177b9b307c2653af0b70d0d8da Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 16 Sep 2022 01:10:52 +0800 Subject: [PATCH 5/6] f Signed-off-by: CalvinNeo --- new-mock-engine-store/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index 790f11488d6..ac718ff6ec0 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -1159,4 +1159,3 @@ unsafe extern "C" fn ffi_handle_compute_store_stats( engine_keys_read: 0, } } - From 369beb19c4e47d719cffda3893e60509b0ee0e5c Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 16 Sep 2022 10:04:52 +0800 Subject: [PATCH 6/6] interface.rs fix Signed-off-by: CalvinNeo --- components/engine_store_ffi/src/interfaces.rs | 10 +++++----- raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version | 2 +- raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h | 4 +++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/components/engine_store_ffi/src/interfaces.rs b/components/engine_store_ffi/src/interfaces.rs index aa9680110c3..c7633f6010c 100644 --- a/components/engine_store_ffi/src/interfaces.rs +++ b/components/engine_store_ffi/src/interfaces.rs @@ -445,14 +445,14 @@ pub mod root { >, pub fn_handle_safe_ts_update: ::std::option::Option< unsafe extern "C" fn( - arg1: *const root::DB::EngineStoreServerWrap, - arg2: u64, - arg3: u64, - arg4: u64, + arg1: *mut root::DB::EngineStoreServerWrap, + region_id: u64, + self_safe_ts: u64, + leader_safe_ts: u64, ), >, } - pub const RAFT_STORE_PROXY_VERSION: u64 = 14699247891578305166; + pub const RAFT_STORE_PROXY_VERSION: u64 = 15776819379826780689; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 6593eefcca8..519af996bc4 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 14699247891578305166ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 15776819379826780689ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index febfad1dfe7..49b82c3704c 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -212,6 +212,8 @@ struct EngineStoreServerHelper { void (*fn_set_store)(EngineStoreServerWrap *, BaseBuffView); void (*fn_set_pb_msg_by_bytes)(MsgPBType type, RawVoidPtr ptr, BaseBuffView buff); - void (*fn_handle_safe_ts_update)(EngineStoreServerWrap *, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts); + void (*fn_handle_safe_ts_update)(EngineStoreServerWrap *, uint64_t region_id, + uint64_t self_safe_ts, + uint64_t leader_safe_ts); }; } // namespace DB