Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reclaim test_raftstore and remove mock-engine-store #172

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ members = [
"fuzz/fuzzer-honggfuzz",
"fuzz/fuzzer-libfuzzer",
"gen-proxy-ffi",
"mock-engine-store",
"raftstore-proxy",
"tests",
]
Expand Down
10 changes: 5 additions & 5 deletions components/engine_store_ffi/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,14 @@ pub mod root {
>,
pub fn_handle_safe_ts_update: ::std::option::Option<
unsafe extern "C" fn(
arg1: *const root::DB::EngineStoreServerWrap,
arg2: u64,
arg3: u64,
arg4: u64,
arg1: *mut root::DB::EngineStoreServerWrap,
region_id: u64,
self_safe_ts: u64,
leader_safe_ts: u64,
),
>,
}
pub const RAFT_STORE_PROXY_VERSION: u64 = 14699247891578305166;
pub const RAFT_STORE_PROXY_VERSION: u64 = 15776819379826780689;
pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639;
}
}
3 changes: 0 additions & 3 deletions components/test_raftstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ test-engines-rocksdb = [
test-engines-panic = [
"raftstore/test-engines-panic",
]
test-raftstore-proxy = ["raftstore/test-raftstore-proxy"]

[dependencies]
api_version = { path = "../api_version" }
Expand All @@ -33,7 +32,6 @@ crossbeam = "0.8"
encryption_export = { path = "../encryption/export", default-features = false }
engine_rocks = { path = "../engine_rocks", default-features = false }
engine_rocks_helper = { path = "../engine_rocks_helper" }
engine_store_ffi = { path = "../engine_store_ffi", default-features = false, features = ["testexport"] }
engine_test = { path = "../engine_test", default-features = false }
engine_traits = { path = "../engine_traits", default-features = false }
fail = "0.5"
Expand All @@ -45,7 +43,6 @@ keys = { path = "../keys", default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git" }
lazy_static = "1.3"
log_wrappers = { path = "../log_wrappers" }
mock-engine-store = { path = "../../mock-engine-store", default-features = false }
pd_client = { path = "../pd_client", default-features = false }
protobuf = { version = "2.8", features = ["bytes"] }
raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] }
Expand Down
167 changes: 11 additions & 156 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::{
collections::hash_map::Entry as MapEntry,
error::Error as StdError,
result,
sync::{
atomic::{AtomicBool, AtomicU8},
mpsc, Arc, Mutex, RwLock,
},
sync::{mpsc, Arc, Mutex, RwLock},
thread,
time::Duration,
};
Expand All @@ -16,7 +13,6 @@ use collections::{HashMap, HashSet};
use crossbeam::channel::TrySendError;
use encryption_export::DataKeyManager;
use engine_rocks::{raw::DB, Compat, RocksEngine, RocksSnapshot};
use engine_store_ffi::TiFlashEngine;
use engine_test::raft::RaftTestEngine;
use engine_traits::{
CompactExt, Engines, Iterable, MiscExt, Mutable, Peekable, RaftEngineReadOnly, WriteBatch,
Expand All @@ -35,7 +31,6 @@ use kvproto::{
RegionLocalState,
},
};
use mock_engine_store::EngineStoreServerWrap;
use pd_client::{BucketStat, PdClient};
use raft::eraftpb::ConfChangeType;
use raftstore::{
Expand All @@ -53,7 +48,6 @@ use raftstore::{
use tempfile::TempDir;
use tikv::server::Result as ServerResult;
use tikv_util::{
sys::SysQuota,
thread_group::GroupProperties,
time::{Instant, ThreadReadId},
worker::LazyWorker,
Expand Down Expand Up @@ -157,23 +151,7 @@ pub trait Simulator {
}
}

pub struct FFIHelperSet {
pub proxy: Box<engine_store_ffi::RaftStoreProxy>,
pub proxy_helper: Box<engine_store_ffi::RaftStoreProxyFFIHelper>,
pub engine_store_server: Box<mock_engine_store::EngineStoreServer>,
pub engine_store_server_wrap: Box<mock_engine_store::EngineStoreServerWrap>,
pub engine_store_server_helper: Box<engine_store_ffi::EngineStoreServerHelper>,
}

pub struct EngineHelperSet {
pub engine_store_server: Box<mock_engine_store::EngineStoreServer>,
pub engine_store_server_wrap: Box<mock_engine_store::EngineStoreServerWrap>,
pub engine_store_server_helper: Box<engine_store_ffi::EngineStoreServerHelper>,
}

pub struct Cluster<T: Simulator> {
pub ffi_helper_set: HashMap<u64, FFIHelperSet>,
pub global_engine_helper_set: Option<EngineHelperSet>,
pub cfg: Config,
leaders: HashMap<u64, metapb::Peer>,
pub count: usize,
Expand All @@ -191,7 +169,6 @@ pub struct Cluster<T: Simulator> {
pub sst_workers_map: HashMap<u64, usize>,
pub sim: Arc<RwLock<T>>,
pub pd_client: Arc<TestPdClient>,
pub proxy_compat: bool,
}

impl<T: Simulator> Cluster<T> {
Expand All @@ -205,8 +182,6 @@ impl<T: Simulator> Cluster<T> {
) -> Cluster<T> {
// TODO: In the future, maybe it's better to test both case where `use_delete_range` is true and false
Cluster {
ffi_helper_set: HashMap::default(),
global_engine_helper_set: None,
cfg: Config {
tikv: new_tikv_config_with_api_ver(id, api_version),
prefer_mem: true,
Expand All @@ -226,7 +201,6 @@ impl<T: Simulator> Cluster<T> {
pd_client,
sst_workers: vec![],
sst_workers_map: HashMap::default(),
proxy_compat: false,
}
}

Expand Down Expand Up @@ -265,7 +239,6 @@ impl<T: Simulator> Cluster<T> {
create_test_engine(router, self.io_rate_limiter.clone(), &self.cfg);
self.dbs.push(engines);
self.key_managers.push(key_manager);
debug!("create_engine path is {}", dir.as_ref().to_str().unwrap());
self.paths.push(dir);
self.sst_workers.push(sst_worker);
}
Expand All @@ -282,81 +255,7 @@ impl<T: Simulator> Cluster<T> {
}
}

pub fn make_global_ffi_helper_set(&mut self) {
let mut engine_store_server =
Box::new(mock_engine_store::EngineStoreServer::new(99999, None));
engine_store_server.proxy_compat = self.proxy_compat;
let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new(
&mut *engine_store_server,
None,
self as *const Cluster<T> as isize,
));
let engine_store_server_helper =
Box::new(mock_engine_store::gen_engine_store_server_helper(
std::pin::Pin::new(&*engine_store_server_wrap),
));

unsafe {
engine_store_ffi::init_engine_store_server_helper(
&*engine_store_server_helper as *const engine_store_ffi::EngineStoreServerHelper
as *mut u8,
);
}

self.global_engine_helper_set = Some(EngineHelperSet {
engine_store_server,
engine_store_server_wrap,
engine_store_server_helper,
});
}

pub fn make_ffi_helper_set(
&mut self,
id: u64,
engines: Engines<RocksEngine, RaftTestEngine>,
key_mgr: &Option<Arc<DataKeyManager>>,
router: &RaftRouter<RocksEngine, RaftTestEngine>,
) -> (FFIHelperSet, Config) {
// This TiFlash engine is a dummy TiFlash engine.
let engine = TiFlashEngine::from_rocks(engines.kv.clone());
let proxy = Box::new(engine_store_ffi::RaftStoreProxy::new(
AtomicU8::new(engine_store_ffi::RaftProxyStatus::Idle as u8),
key_mgr.clone(),
Some(Box::new(engine_store_ffi::ReadIndexClient::new(
router.clone(),
SysQuota::cpu_cores_quota() as usize * 2,
))),
std::sync::RwLock::new(Some(engine)),
));

let mut proxy_helper = Box::new(engine_store_ffi::RaftStoreProxyFFIHelper::new(&proxy));
let mut engine_store_server =
Box::new(mock_engine_store::EngineStoreServer::new(id, Some(engines)));
engine_store_server.proxy_compat = self.proxy_compat;
let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new(
&mut *engine_store_server,
Some(&mut *proxy_helper),
self as *const Cluster<T> as isize,
));
let engine_store_server_helper =
Box::new(mock_engine_store::gen_engine_store_server_helper(
std::pin::Pin::new(&*engine_store_server_wrap),
));

let node_cfg = self.cfg.clone();
let ffi_helper_set = FFIHelperSet {
proxy,
proxy_helper,
engine_store_server,
engine_store_server_wrap,
engine_store_server_helper,
};
(ffi_helper_set, node_cfg)
}

pub fn start(&mut self) -> ServerResult<()> {
self.make_global_ffi_helper_set();

// Try recover from last shutdown.
let node_ids: Vec<u64> = self.engines.iter().map(|(&id, _)| id).collect();
for node_id in node_ids {
Expand All @@ -375,22 +274,16 @@ impl<T: Simulator> Cluster<T> {
let props = GroupProperties::default();
tikv_util::thread_group::set_properties(Some(props.clone()));

let (mut ffi_helper_set, mut node_cfg) =
self.make_ffi_helper_set(0, self.dbs.last().unwrap().clone(), &key_mgr, &router);

let mut sim = self.sim.wl();
let node_id = sim.run_node(
0,
node_cfg,
self.cfg.clone(),
engines.clone(),
store_meta.clone(),
key_mgr.clone(),
router,
system,
)?;
debug!("start new node {}", node_id);
ffi_helper_set.engine_store_server.id = node_id;
self.ffi_helper_set.insert(node_id, ffi_helper_set);
self.group_props.insert(node_id, props);
self.engines.insert(node_id, engines);
self.store_metas.insert(node_id, store_meta);
Expand Down Expand Up @@ -441,6 +334,10 @@ impl<T: Simulator> Cluster<T> {
let engines = self.engines[&node_id].clone();
let key_mgr = self.key_managers_map[&node_id].clone();
let (router, system) = create_raft_batch_system(&self.cfg.raft_store);
let mut cfg = self.cfg.clone();
if let Some(labels) = self.labels.get(&node_id) {
cfg.server.labels = labels.to_owned();
}
let store_meta = match self.store_metas.entry(node_id) {
MapEntry::Occupied(o) => {
let mut meta = o.get().lock().unwrap();
Expand All @@ -455,29 +352,10 @@ impl<T: Simulator> Cluster<T> {
self.group_props.insert(node_id, props.clone());
tikv_util::thread_group::set_properties(Some(props));
debug!("calling run node"; "node_id" => node_id);

let mut node_cfg = if self.ffi_helper_set.contains_key(&node_id) {
let node_cfg = self.cfg.clone();
node_cfg
} else {
let (ffi_helper_set, node_cfg) = self.make_ffi_helper_set(
node_id,
self.engines[&node_id].clone(),
&key_mgr,
&router,
);
self.ffi_helper_set.insert(node_id, ffi_helper_set);
node_cfg
};

if let Some(labels) = self.labels.get(&node_id) {
node_cfg.server.labels = labels.to_owned();
}

// FIXME: rocksdb event listeners may not work, because we change the router.
self.sim.wl().run_node(
node_id, node_cfg, engines, store_meta, key_mgr, router, system,
)?;
self.sim
.wl()
.run_node(node_id, cfg, engines, store_meta, key_mgr, router, system)?;
debug!("node {} started", node_id);
Ok(())
}
Expand Down Expand Up @@ -1132,16 +1010,8 @@ impl<T: Simulator> Cluster<T> {
}

pub fn must_put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) {
match self.batch_put(key, vec![new_put_cf_cmd(cf, key, value)]) {
Ok(resp) => {
if cfg!(feature = "test-raftstore-proxy") {
assert_eq!(resp.get_responses().len(), 1);
assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put);
}
}
Err(e) => {
panic!("has error: {:?}", e);
}
if let Err(e) = self.batch_put(key, vec![new_put_cf_cmd(cf, key, value)]) {
panic!("has error: {:?}", e);
}
}

Expand Down Expand Up @@ -1549,7 +1419,6 @@ impl<T: Simulator> Cluster<T> {
debug!("asking split"; "region" => ?region, "key" => ?split_key);
// In case ask split message is ignored, we should retry.
if try_cnt % 50 == 0 {
debug!("must_split try once, count {}", try_cnt);
self.reset_leader_of_region(region.get_id());
let key = split_key.to_vec();
let check = Box::new(move |write_resp: WriteResponse| {
Expand Down Expand Up @@ -1908,17 +1777,3 @@ impl<T: Simulator> Drop for Cluster<T> {
self.shutdown();
}
}

pub fn gen_cluster(cluster_ptr: isize) -> Option<&'static Cluster<NodeCluster>> {
unsafe {
if cluster_ptr == 0 {
None
} else {
Some(&(*(cluster_ptr as *const Cluster<NodeCluster>)))
}
}
}

pub unsafe fn init_cluster_ptr(cluster_ptr: &Cluster<NodeCluster>) -> isize {
cluster_ptr as *const Cluster<NodeCluster> as isize
}
Loading