From d58cfbb74b2b3859ea1cfffc4c5b9a5f7e56ad70 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 24 Aug 2022 20:30:19 +0800 Subject: [PATCH] refactor tests Signed-off-by: CalvinNeo --- components/proxy_server/src/run.rs | 6 +- .../src/engine_store_ffi/observer.rs | 20 +- tests/proxy/normal.rs | 2001 +++++++++-------- 3 files changed, 1035 insertions(+), 992 deletions(-) diff --git a/components/proxy_server/src/run.rs b/components/proxy_server/src/run.rs index 5d98e0f694f..4e4fd791235 100644 --- a/components/proxy_server/src/run.rs +++ b/components/proxy_server/src/run.rs @@ -421,7 +421,11 @@ impl TiKvServer { }); // engine_tiflash::RocksEngine has engine_rocks::RocksEngine inside let mut kv_engine = TiFlashEngine::from_rocks(kv_engine); - kv_engine.init(engine_store_server_helper, self.proxy_config.raft_store.snap_handle_pool_size, Some(ffi_hub)); + kv_engine.init( + engine_store_server_helper, + self.proxy_config.raft_store.snap_handle_pool_size, + Some(ffi_hub), + ); let engines = Engines::new(kv_engine, raft_engine); diff --git a/components/raftstore/src/engine_store_ffi/observer.rs b/components/raftstore/src/engine_store_ffi/observer.rs index bfce37129cb..18b7cf49d0b 100644 --- a/components/raftstore/src/engine_store_ffi/observer.rs +++ b/components/raftstore/src/engine_store_ffi/observer.rs @@ -236,6 +236,7 @@ impl TiFlashObserver { impl Coprocessor for TiFlashObserver { fn stop(&self) { // TODO(tiflash) remove this when pre apply merged + info!("shutdown tiflash observer"; "peer_id" => self.peer_id); self.apply_snap_pool.as_ref().unwrap().shutdown(); } } @@ -653,7 +654,7 @@ impl ApplySnapshotObserver for TiFlashObserver { let ssts = retrieve_sst_files(snap); self.engine .pending_applies_count - .fetch_add(1, Ordering::Relaxed); + .fetch_add(1, Ordering::SeqCst); match self.apply_snap_pool.as_ref() { Some(p) => { p.spawn(async move { @@ -675,7 +676,7 @@ impl ApplySnapshotObserver for TiFlashObserver { None => { self.engine .pending_applies_count - .fetch_sub(1, Ordering::Relaxed); + .fetch_sub(1, Ordering::SeqCst); error!("apply_snap_pool is not initialized, quit background pre apply"; "peer_id" => peer_id, "region_id" => ob_ctx.region().get_id()); } } @@ -706,7 +707,12 @@ impl ApplySnapshotObserver for TiFlashObserver { Some(t) => { let neer_retry = match t.recv.recv() { Ok(snap_ptr) => { - info!("get prehandled snapshot success"); + info!("get prehandled snapshot success"; + "peer_id" => ?snap_key, + "region" => ?ob_ctx.region(), + "pending" => self.engine + .pending_applies_count.load(Ordering::SeqCst), + ); self.engine_store_server_helper .apply_pre_handled_snapshot(snap_ptr.0); false @@ -721,7 +727,13 @@ impl ApplySnapshotObserver for TiFlashObserver { }; self.engine .pending_applies_count - .fetch_sub(1, Ordering::Relaxed); + .fetch_sub(1, Ordering::SeqCst); + info!("apply snapshot finished"; + "peer_id" => ?snap_key, + "region" => ?ob_ctx.region(), + "pending" => self.engine + .pending_applies_count.load(Ordering::SeqCst), + ); neer_retry } None => { diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 4f78022899a..a8122a52bf8 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -61,660 +61,777 @@ use tikv_util::{ use crate::proxy::*; -#[test] -fn test_config() { - let mut file = tempfile::NamedTempFile::new().unwrap(); - let text = "memory-usage-high-water=0.65\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; - write!(file, "{}", text).unwrap(); - let path = file.path(); - - let mut unrecognized_keys = Vec::new(); - let mut config = TiKvConfig::from_file(path, Some(&mut unrecognized_keys)).unwrap(); - // Otherwise we have no default addr for TiKv. - setup_default_tikv_config(&mut config); - assert_eq!(config.memory_usage_high_water, 0.65); - assert_eq!(config.rocksdb.max_open_files, 111); - assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); - assert_eq!(unrecognized_keys.len(), 3); - - let mut proxy_unrecognized_keys = Vec::new(); - let proxy_config = ProxyConfig::from_file(path, Some(&mut proxy_unrecognized_keys)).unwrap(); - assert_eq!(proxy_config.raft_store.snap_handle_pool_size, 4); - assert_eq!(proxy_config.server.engine_addr, "1.2.3.4:5"); - assert!(proxy_unrecognized_keys.contains(&"rocksdb".to_string())); - assert!(proxy_unrecognized_keys.contains(&"memory-usage-high-water".to_string())); - assert!(proxy_unrecognized_keys.contains(&"nosense".to_string())); - let v1 = vec!["a.b", "b"] - .iter() - .map(|e| String::from(*e)) - .collect::>(); - let v2 = vec!["a.b", "b.b", "c"] - .iter() - .map(|e| String::from(*e)) - .collect::>(); - let unknown = ensure_no_common_unrecognized_keys(&v1, &v2); - assert_eq!(unknown.is_err(), true); - assert_eq!(unknown.unwrap_err(), "a.b, b.b"); - let unknown = ensure_no_common_unrecognized_keys(&proxy_unrecognized_keys, &unrecognized_keys); - assert_eq!(unknown.is_err(), true); - assert_eq!(unknown.unwrap_err(), "nosense, rocksdb.z"); - - // Need run this test with ENGINE_LABEL_VALUE=tiflash, otherwise will fatal exit. - server::setup::validate_and_persist_config(&mut config, true); - - // Will not override ProxyConfig - let proxy_config_new = ProxyConfig::from_file(path, None).unwrap(); - assert_eq!(proxy_config_new.raft_store.snap_handle_pool_size, 4); -} +mod store { + use super::*; + #[test] + fn test_store_stats() { + let (mut cluster, pd_client) = new_mock_cluster(0, 1); -#[test] -fn test_config_default_addr() { - let mut file = tempfile::NamedTempFile::new().unwrap(); - let text = "memory-usage-high-water=0.65\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; - write!(file, "{}", text).unwrap(); - let path = file.path(); - let args: Vec<&str> = vec![]; - let matches = App::new("RaftStore Proxy") - .arg( - Arg::with_name("config") - .short("C") - .long("config") - .value_name("FILE") - .help("Set the configuration file") - .takes_value(true), - ) - .get_matches_from(args); - let c = format!("--config {}", path.to_str().unwrap()); - let mut v = vec![c]; - let config = gen_tikv_config(&matches, false, &mut v); - assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); - assert_eq!(config.server.status_addr, TIFLASH_DEFAULT_STATUS_ADDR); - assert_eq!( - config.server.advertise_status_addr, - TIFLASH_DEFAULT_STATUS_ADDR - ); + let _ = cluster.run(); + + for id in cluster.engines.keys() { + let engine = cluster.get_tiflash_engine(*id); + assert_eq!( + engine.ffi_hub.as_ref().unwrap().get_store_stats().capacity, + 444444 + ); + } + + for id in cluster.engines.keys() { + cluster.must_send_store_heartbeat(*id); + } + std::thread::sleep(std::time::Duration::from_millis(1000)); + // let resp = block_on(pd_client.store_heartbeat(Default::default(), None, None)).unwrap(); + for id in cluster.engines.keys() { + let store_stat = pd_client.get_store_stats(*id).unwrap(); + assert_eq!(store_stat.get_capacity(), 444444); + assert_eq!(store_stat.get_available(), 333333); + } + // The same to mock-engine-store + cluster.shutdown(); + } } -fn test_store_stats() { - let (mut cluster, pd_client) = new_mock_cluster(0, 1); +mod region { + use super::*; + #[test] + fn test_handle_destroy() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // Disable raft log gc in this test case. + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); - let _ = cluster.run(); + // Disable default max peer count check. + pd_client.disable_default_operator(); - for id in cluster.engines.keys() { - let engine = cluster.get_tiflash_engine(*id); - assert_eq!( - engine.ffi_hub.as_ref().unwrap().get_store_stats().capacity, - 444444 + cluster.run(); + cluster.must_put(b"k1", b"v1"); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); + let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); + cluster.must_transfer_leader(region_id, peer_1); + + iter_ffi_helpers( + &cluster, + Some(vec![eng_ids[1]]), + &mut |_, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + assert!(server.kvstore.contains_key(®ion_id)); + }, ); - } - for id in cluster.engines.keys() { - cluster.must_send_store_heartbeat(*id); - } - std::thread::sleep(std::time::Duration::from_millis(1000)); - // let resp = block_on(pd_client.store_heartbeat(Default::default(), None, None)).unwrap(); - for id in cluster.engines.keys() { - let store_stat = pd_client.get_store_stats(*id).unwrap(); - assert_eq!(store_stat.get_capacity(), 444444); - assert_eq!(store_stat.get_available(), 333333); + pd_client.must_remove_peer(region_id, peer_2); + + check_key( + &cluster, + b"k1", + b"k2", + Some(false), + None, + Some(vec![eng_ids[1]]), + ); + + // Region removed in server. + iter_ffi_helpers( + &cluster, + Some(vec![eng_ids[1]]), + &mut |_, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + assert!(!server.kvstore.contains_key(®ion_id)); + }, + ); + + cluster.shutdown(); } - // The same to mock-engine-store - cluster.shutdown(); } -#[test] -fn test_store_setup() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - - // Add label to cluster - address_proxy_config(&mut cluster.cfg.tikv); - - // Try to start this node, return after persisted some keys. - let _ = cluster.start(); - let store_id = cluster.engines.keys().last().unwrap(); - let store = pd_client.get_store(*store_id).unwrap(); - println!("store {:?}", store); - assert!( - store - .get_labels() +mod config { + use super::*; + #[test] + fn test_config() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + let text = "memory-usage-high-water=0.65\n[server]\nengine-addr=\"1.2.3.4:5\"\n[raftstore]\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + write!(file, "{}", text).unwrap(); + let path = file.path(); + + let mut unrecognized_keys = Vec::new(); + let mut config = TiKvConfig::from_file(path, Some(&mut unrecognized_keys)).unwrap(); + // Otherwise we have no default addr for TiKv. + setup_default_tikv_config(&mut config); + assert_eq!(config.memory_usage_high_water, 0.65); + assert_eq!(config.rocksdb.max_open_files, 111); + assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); + assert_eq!(unrecognized_keys.len(), 3); + + let mut proxy_unrecognized_keys = Vec::new(); + let proxy_config = + ProxyConfig::from_file(path, Some(&mut proxy_unrecognized_keys)).unwrap(); + assert_eq!(proxy_config.raft_store.snap_handle_pool_size, 4); + assert_eq!(proxy_config.server.engine_addr, "1.2.3.4:5"); + assert!(proxy_unrecognized_keys.contains(&"rocksdb".to_string())); + assert!(proxy_unrecognized_keys.contains(&"memory-usage-high-water".to_string())); + assert!(proxy_unrecognized_keys.contains(&"nosense".to_string())); + let v1 = vec!["a.b", "b"] + .iter() + .map(|e| String::from(*e)) + .collect::>(); + let v2 = vec!["a.b", "b.b", "c"] .iter() - .find(|&x| x.key == "engine" && x.value == "tiflash") - .is_some() - ); + .map(|e| String::from(*e)) + .collect::>(); + let unknown = ensure_no_common_unrecognized_keys(&v1, &v2); + assert_eq!(unknown.is_err(), true); + assert_eq!(unknown.unwrap_err(), "a.b, b.b"); + let unknown = + ensure_no_common_unrecognized_keys(&proxy_unrecognized_keys, &unrecognized_keys); + assert_eq!(unknown.is_err(), true); + assert_eq!(unknown.unwrap_err(), "nosense, rocksdb.z"); + + // Need run this test with ENGINE_LABEL_VALUE=tiflash, otherwise will fatal exit. + server::setup::validate_and_persist_config(&mut config, true); + + // Will not override ProxyConfig + let proxy_config_new = ProxyConfig::from_file(path, None).unwrap(); + assert_eq!(proxy_config_new.raft_store.snap_handle_pool_size, 4); + } - cluster.shutdown(); + #[test] + fn test_config_default_addr() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + let text = "memory-usage-high-water=0.65\nsnap-handle-pool-size=4\n[nosense]\nfoo=2\n[rocksdb]\nmax-open-files = 111\nz=1"; + write!(file, "{}", text).unwrap(); + let path = file.path(); + let args: Vec<&str> = vec![]; + let matches = App::new("RaftStore Proxy") + .arg( + Arg::with_name("config") + .short("C") + .long("config") + .value_name("FILE") + .help("Set the configuration file") + .takes_value(true), + ) + .get_matches_from(args); + let c = format!("--config {}", path.to_str().unwrap()); + let mut v = vec![c]; + let config = gen_tikv_config(&matches, false, &mut v); + assert_eq!(config.server.addr, TIFLASH_DEFAULT_LISTENING_ADDR); + assert_eq!(config.server.status_addr, TIFLASH_DEFAULT_STATUS_ADDR); + assert_eq!( + config.server.advertise_status_addr, + TIFLASH_DEFAULT_STATUS_ADDR + ); + assert_eq!(config.raft_store.region_worker_tick_interval, 500); + } + + #[test] + fn test_store_setup() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // Add label to cluster + address_proxy_config(&mut cluster.cfg.tikv); + + // Try to start this node, return after persisted some keys. + let _ = cluster.start(); + let store_id = cluster.engines.keys().last().unwrap(); + let store = pd_client.get_store(*store_id).unwrap(); + println!("store {:?}", store); + assert!( + store + .get_labels() + .iter() + .find(|&x| x.key == "engine" && x.value == "tiflash") + .is_some() + ); + cluster.shutdown(); + } } -#[test] -fn test_interaction() { - // TODO Maybe we should pick this test to TiKV. - // This test is to check if empty entries can affect pre_exec and post_exec. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); +mod write { + use super::*; + #[test] + fn test_interaction() { + // TODO Maybe we should pick this test to TiKV. + // This test is to check if empty entries can affect pre_exec and post_exec. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - fail::cfg("try_flush_data", "return(0)").unwrap(); - let _ = cluster.run(); + fail::cfg("try_flush_data", "return(0)").unwrap(); + let _ = cluster.run(); - cluster.must_put(b"k1", b"v1"); - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); + cluster.must_put(b"k1", b"v1"); + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + + // Wait until all nodes have (k1, v1). + check_key(&cluster, b"k1", b"v1", Some(true), None, None); + + let prev_states = collect_all_states(&cluster, region_id); + let compact_log = test_raftstore::new_compact_log_request(100, 10); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let _ = cluster + .call_command_on_leader(req.clone(), Duration::from_secs(3)) + .unwrap(); + + // Empty result can also be handled by post_exec + let mut retry = 0; + let new_states = loop { + let new_states = collect_all_states(&cluster, region_id); + let mut ok = true; + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + if old.in_memory_apply_state == new.in_memory_apply_state + && old.in_memory_applied_term == new.in_memory_applied_term + { + ok = false; + break; + } + } + if ok { + break new_states; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + retry += 1; + }; + + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + // An empty cmd will not cause persistence. + assert_eq!(old.in_disk_apply_state, new.in_disk_apply_state); + } - // Wait until all nodes have (k1, v1). - check_key(&cluster, b"k1", b"v1", Some(true), None, None); + cluster.must_put(b"k2", b"v2"); + // Wait until all nodes have (k2, v2). + check_key(&cluster, b"k2", b"v2", Some(true), None, None); - let prev_states = collect_all_states(&cluster, region_id); - let compact_log = test_raftstore::new_compact_log_request(100, 10); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let _ = cluster - .call_command_on_leader(req.clone(), Duration::from_secs(3)) - .unwrap(); + fail::cfg("on_empty_cmd_normal", "return").unwrap(); + let prev_states = collect_all_states(&cluster, region_id); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); - // Empty result can also be handled by post_exec - let mut retry = 0; - let new_states = loop { + std::thread::sleep(std::time::Duration::from_millis(400)); let new_states = collect_all_states(&cluster, region_id); - let mut ok = true; for i in prev_states.keys() { let old = prev_states.get(i).unwrap(); let new = new_states.get(i).unwrap(); - if old.in_memory_apply_state == new.in_memory_apply_state - && old.in_memory_applied_term == new.in_memory_applied_term - { - ok = false; - break; - } - } - if ok { - break new_states; + assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); } - std::thread::sleep(std::time::Duration::from_millis(100)); - retry += 1; - }; - - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); - // An empty cmd will not cause persistence. - assert_eq!(old.in_disk_apply_state, new.in_disk_apply_state); + + fail::remove("try_flush_data"); + fail::remove("on_empty_cmd_normal"); + cluster.shutdown(); } - cluster.must_put(b"k2", b"v2"); - // Wait until all nodes have (k2, v2). - check_key(&cluster, b"k2", b"v2", Some(true), None, None); - - fail::cfg("on_empty_cmd_normal", "return").unwrap(); - let prev_states = collect_all_states(&cluster, region_id); - let _ = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - - std::thread::sleep(std::time::Duration::from_millis(400)); - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + #[test] + fn test_leadership_change_filter() { + test_leadership_change_impl(true); } - fail::remove("try_flush_data"); - fail::remove("on_empty_cmd_normal"); - cluster.shutdown(); -} + #[test] + fn test_leadership_change_no_persist() { + test_leadership_change_impl(false); + } -#[test] -fn test_leadership_change_filter() { - test_leadership_change_impl(true); -} + fn test_leadership_change_impl(filter: bool) { + // Test if a empty command can be observed when leadership changes. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); -#[test] -fn test_leadership_change_no_persist() { - test_leadership_change_impl(false); -} + disable_auto_gen_compact_log(&mut cluster); -fn test_leadership_change_impl(filter: bool) { - // Test if a empty command can be observed when leadership changes. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + if filter { + // We don't handle CompactLog at all. + fail::cfg("try_flush_data", "return(0)").unwrap(); + } else { + // We don't return Persist after handling CompactLog. + fail::cfg("no_persist_compact_log", "return").unwrap(); + } + // Do not handle empty cmd. + fail::cfg("on_empty_cmd_normal", "return").unwrap(); + let _ = cluster.run(); - disable_auto_gen_compact_log(&mut cluster); + cluster.must_put(b"k1", b"v1"); + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); - if filter { - // We don't handle CompactLog at all. - fail::cfg("try_flush_data", "return(0)").unwrap(); - } else { - // We don't return Persist after handling CompactLog. - fail::cfg("no_persist_compact_log", "return").unwrap(); - } - // Do not handle empty cmd. - fail::cfg("on_empty_cmd_normal", "return").unwrap(); - let _ = cluster.run(); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); + let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); + cluster.must_transfer_leader(region.get_id(), peer_1.clone()); - cluster.must_put(b"k1", b"v1"); - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); + cluster.must_put(b"k2", b"v2"); + fail::cfg("on_empty_cmd_normal", "return").unwrap(); - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); - let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); - cluster.must_transfer_leader(region.get_id(), peer_1.clone()); + // Wait until all nodes have (k2, v2), then transfer leader. + check_key(&cluster, b"k2", b"v2", Some(true), None, None); + if filter { + // We should also filter normal kv, since a empty result can also be invoke pose_exec. + fail::cfg("on_post_exec_normal", "return(false)").unwrap(); + } + let prev_states = collect_all_states(&cluster, region_id); + cluster.must_transfer_leader(region.get_id(), peer_2.clone()); - cluster.must_put(b"k2", b"v2"); - fail::cfg("on_empty_cmd_normal", "return").unwrap(); + // The states remain the same, since we don't observe empty cmd. + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + if filter { + // CompactLog can still change in-memory state, when exec in memory. + assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + } + assert_eq!(old.in_disk_apply_state, new.in_disk_apply_state); + } + + fail::remove("on_empty_cmd_normal"); + // We need forward empty cmd generated by leadership changing to TiFlash. + cluster.must_transfer_leader(region.get_id(), peer_1.clone()); + std::thread::sleep(std::time::Duration::from_secs(1)); + + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term); + } - // Wait until all nodes have (k2, v2), then transfer leader. - check_key(&cluster, b"k2", b"v2", Some(true), None, None); - if filter { - // We should also filter normal kv, since a empty result can also be invoke pose_exec. - fail::cfg("on_post_exec_normal", "return(false)").unwrap(); - } - let prev_states = collect_all_states(&cluster, region_id); - cluster.must_transfer_leader(region.get_id(), peer_2.clone()); - - // The states remain the same, since we don't observe empty cmd. - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); if filter { - // CompactLog can still change in-memory state, when exec in memory. - assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + fail::remove("try_flush_data"); + fail::remove("on_post_exec_normal"); + } else { + fail::remove("no_persist_compact_log"); } - assert_eq!(old.in_disk_apply_state, new.in_disk_apply_state); + cluster.shutdown(); } - fail::remove("on_empty_cmd_normal"); - // We need forward empty cmd generated by leadership changing to TiFlash. - cluster.must_transfer_leader(region.get_id(), peer_1.clone()); - std::thread::sleep(std::time::Duration::from_secs(1)); - - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term); - } + #[test] + fn test_kv_write_always_persist() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - if filter { - fail::remove("try_flush_data"); - fail::remove("on_post_exec_normal"); - } else { - fail::remove("no_persist_compact_log"); + let _ = cluster.run(); + + cluster.must_put(b"k0", b"v0"); + let region_id = cluster.get_region(b"k0").get_id(); + + let mut prev_states = collect_all_states(&cluster, region_id); + // Always persist on every command + fail::cfg("on_post_exec_normal_end", "return(true)").unwrap(); + for i in 1..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + + // We can't always get kv from disk, even we commit everytime, + // since they are filtered by engint_tiflash + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + + // This may happen after memory write data and before commit. + // We must check if we already have in memory. + check_apply_state(&cluster, region_id, &prev_states, Some(false), None); + std::thread::sleep(std::time::Duration::from_millis(20)); + // However, advanced apply index will always persisted. + let new_states = collect_all_states(&cluster, region_id); + for id in cluster.engines.keys() { + let p = &prev_states.get(id).unwrap().in_disk_apply_state; + let n = &new_states.get(id).unwrap().in_disk_apply_state; + assert_ne!(p, n); + } + prev_states = new_states; + } + + cluster.shutdown(); } - cluster.shutdown(); -} -#[test] -fn test_kv_write_always_persist() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + #[test] + fn test_kv_write() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - let _ = cluster.run(); + fail::cfg("on_post_exec_normal", "return(false)").unwrap(); + fail::cfg("on_post_exec_admin", "return(false)").unwrap(); + // Abandon CompactLog and previous flush. + fail::cfg("try_flush_data", "return(0)").unwrap(); - cluster.must_put(b"k0", b"v0"); - let region_id = cluster.get_region(b"k0").get_id(); - - let mut prev_states = collect_all_states(&cluster, region_id); - // Always persist on every command - fail::cfg("on_post_exec_normal_end", "return(true)").unwrap(); - for i in 1..20 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - - // We can't always get kv from disk, even we commit everytime, - // since they are filtered by engint_tiflash - check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); - - // This may happen after memory write data and before commit. - // We must check if we already have in memory. - check_apply_state(&cluster, region_id, &prev_states, Some(false), None); - std::thread::sleep(std::time::Duration::from_millis(20)); - // However, advanced apply index will always persisted. - let new_states = collect_all_states(&cluster, region_id); + let _ = cluster.run(); + + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + + // Since we disable all observers, we can get nothing in either memory and disk. + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(false), + Some(false), + None, + ); + } + + // We can read initial raft state, since we don't persist meta either. + let r1 = cluster.get_region(b"k1").get_id(); + let prev_states = collect_all_states(&cluster, r1); + + fail::remove("on_post_exec_normal"); + fail::remove("on_post_exec_admin"); + for i in 10..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + + // Since we enable all observers, we can get in memory. + // However, we get nothing in disk since we don't persist. + for i in 10..20 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + Some(false), + None, + ); + } + + let new_states = collect_all_states(&cluster, r1); for id in cluster.engines.keys() { - let p = &prev_states.get(id).unwrap().in_disk_apply_state; - let n = &new_states.get(id).unwrap().in_disk_apply_state; - assert_ne!(p, n); + assert_ne!( + &prev_states.get(id).unwrap().in_memory_apply_state, + &new_states.get(id).unwrap().in_memory_apply_state + ); + assert_eq!( + &prev_states.get(id).unwrap().in_disk_apply_state, + &new_states.get(id).unwrap().in_disk_apply_state + ); } - prev_states = new_states; - } - cluster.shutdown(); -} + std::thread::sleep(std::time::Duration::from_millis(20)); + fail::remove("try_flush_data"); -#[test] -fn test_kv_write() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + let prev_states = collect_all_states(&cluster, r1); + // Write more after we force persist when CompactLog. + for i in 20..30 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } - fail::cfg("on_post_exec_normal", "return(false)").unwrap(); - fail::cfg("on_post_exec_admin", "return(false)").unwrap(); - // Abandon CompactLog and previous flush. - fail::cfg("try_flush_data", "return(0)").unwrap(); + // We can read from mock-store's memory, we are not sure if we can read from disk, + // since there may be or may not be a CompactLog. + for i in 11..30 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } - let _ = cluster.run(); + // Force a compact log to persist. + let region_r = cluster.get_region("k1".as_bytes()); + let region_id = region_r.get_id(); + let compact_log = test_raftstore::new_compact_log_request(1000, 100); + let req = + test_raftstore::new_admin_request(region_id, region_r.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + assert!(res.get_header().has_error(), "{:?}", res); + // This CompactLog is executed with an error. It will not trigger a compaction. + // However, it can trigger a persistence. + for i in 11..30 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + Some(true), + None, + ); + } - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - } + let new_states = collect_all_states(&cluster, r1); - // Since we disable all observers, we can get nothing in either memory and disk. - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key( - &cluster, - k.as_bytes(), - v.as_bytes(), - Some(false), - Some(false), - None, - ); + // apply_state is changed in memory, and persisted. + for id in cluster.engines.keys() { + assert_ne!( + &prev_states.get(id).unwrap().in_memory_apply_state, + &new_states.get(id).unwrap().in_memory_apply_state + ); + assert_ne!( + &prev_states.get(id).unwrap().in_disk_apply_state, + &new_states.get(id).unwrap().in_disk_apply_state + ); + } + + fail::remove("no_persist_compact_log"); + cluster.shutdown(); } - // We can read initial raft state, since we don't persist meta either. - let r1 = cluster.get_region(b"k1").get_id(); - let prev_states = collect_all_states(&cluster, r1); + #[test] + fn test_consistency_check() { + // ComputeHash and VerifyHash shall be filtered. + let (mut cluster, pd_client) = new_mock_cluster(0, 2); - fail::remove("on_post_exec_normal"); - fail::remove("on_post_exec_admin"); - for i in 10..20 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - } + cluster.run(); - // Since we enable all observers, we can get in memory. - // However, we get nothing in disk since we don't persist. - for i in 10..20 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key( - &cluster, - k.as_bytes(), - v.as_bytes(), - Some(true), - Some(false), - None, - ); - } + cluster.must_put(b"k", b"v"); + let region = cluster.get_region("k".as_bytes()); + let region_id = region.get_id(); - let new_states = collect_all_states(&cluster, r1); - for id in cluster.engines.keys() { - assert_ne!( - &prev_states.get(id).unwrap().in_memory_apply_state, - &new_states.get(id).unwrap().in_memory_apply_state - ); - assert_eq!( - &prev_states.get(id).unwrap().in_disk_apply_state, - &new_states.get(id).unwrap().in_disk_apply_state - ); - } + let r = new_compute_hash_request(); + let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), r); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); - std::thread::sleep(std::time::Duration::from_millis(20)); - fail::remove("try_flush_data"); + let r = new_verify_hash_request(vec![7, 8, 9, 0], 1000); + let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), r); + let _ = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); - let prev_states = collect_all_states(&cluster, r1); - // Write more after we force persist when CompactLog. - for i in 20..30 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); + cluster.must_put(b"k2", b"v2"); + cluster.shutdown(); } - // We can read from mock-store's memory, we are not sure if we can read from disk, - // since there may be or may not be a CompactLog. - for i in 11..30 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); - } + #[test] + fn test_old_compact_log() { + // If we just return None for CompactLog, the region state in ApplyFsm will change. + // Because there is no rollback in new implementation. + // This is a ERROR state. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + cluster.run(); - // Force a compact log to persist. - let region_r = cluster.get_region("k1".as_bytes()); - let region_id = region_r.get_id(); - let compact_log = test_raftstore::new_compact_log_request(1000, 100); - let req = - test_raftstore::new_admin_request(region_id, region_r.get_region_epoch(), compact_log); - let res = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - assert!(res.get_header().has_error(), "{:?}", res); - // This CompactLog is executed with an error. It will not trigger a compaction. - // However, it can trigger a persistence. - for i in 11..30 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key( - &cluster, - k.as_bytes(), - v.as_bytes(), - Some(true), - Some(true), - None, - ); - } + // We don't return Persist after handling CompactLog. + fail::cfg("no_persist_compact_log", "return").unwrap(); + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } - let new_states = collect_all_states(&cluster, r1); + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } - // apply_state is changed in memory, and persisted. - for id in cluster.engines.keys() { - assert_ne!( - &prev_states.get(id).unwrap().in_memory_apply_state, - &new_states.get(id).unwrap().in_memory_apply_state - ); - assert_ne!( - &prev_states.get(id).unwrap().in_disk_apply_state, - &new_states.get(id).unwrap().in_disk_apply_state - ); + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let prev_state = collect_all_states(&cluster, region_id); + let (compact_index, compact_term) = get_valid_compact_index(&prev_state); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + + // Wait for state applys. + std::thread::sleep(std::time::Duration::from_secs(2)); + + let new_state = collect_all_states(&cluster, region_id); + for i in prev_state.keys() { + let old = prev_state.get(i).unwrap(); + let new = new_state.get(i).unwrap(); + assert_ne!( + old.in_memory_apply_state.get_truncated_state(), + new.in_memory_apply_state.get_truncated_state() + ); + assert_eq!( + old.in_disk_apply_state.get_truncated_state(), + new.in_disk_apply_state.get_truncated_state() + ); + } + + cluster.shutdown(); } - fail::remove("no_persist_compact_log"); - cluster.shutdown(); -} + #[test] + fn test_compact_log() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); -#[test] -fn test_consistency_check() { - // ComputeHash and VerifyHash shall be filtered. - let (mut cluster, pd_client) = new_mock_cluster(0, 2); + disable_auto_gen_compact_log(&mut cluster); - cluster.run(); + cluster.run(); - cluster.must_put(b"k", b"v"); - let region = cluster.get_region("k".as_bytes()); - let region_id = region.get_id(); + cluster.must_put(b"k", b"v"); + let region = cluster.get_region("k".as_bytes()); + let region_id = region.get_id(); - let r = new_compute_hash_request(); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), r); - let _ = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); + fail::cfg("on_empty_cmd_normal", "return").unwrap(); + fail::cfg("try_flush_data", "return(0)").unwrap(); + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } - let r = new_verify_hash_request(vec![7, 8, 9, 0], 1000); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), r); - let _ = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); + std::thread::sleep(std::time::Duration::from_millis(500)); + let prev_state = collect_all_states(&cluster, region_id); + + let (compact_index, compact_term) = get_valid_compact_index(&prev_state); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + // compact index should less than applied index + assert!(!res.get_header().has_error(), "{:?}", res); + + // TODO(tiflash) Make sure compact log is filtered successfully. + // Can be abstract to a retry function. + std::thread::sleep(std::time::Duration::from_millis(500)); + + // CompactLog is filtered, because we can't flush data. + // However, we can still observe apply index advanced + let new_state = collect_all_states(&cluster, region_id); + for i in prev_state.keys() { + let old = prev_state.get(i).unwrap(); + let new = new_state.get(i).unwrap(); + assert_eq!( + old.in_memory_apply_state.get_truncated_state(), + new.in_memory_apply_state.get_truncated_state() + ); + assert_eq!( + old.in_disk_apply_state.get_truncated_state(), + new.in_disk_apply_state.get_truncated_state() + ); + assert_eq!( + old.in_memory_apply_state.get_applied_index() + 1, + new.in_memory_apply_state.get_applied_index() + ); + // Persist is before. + assert_eq!( + old.in_disk_apply_state.get_applied_index(), + new.in_disk_apply_state.get_applied_index() + ); + } - cluster.must_put(b"k2", b"v2"); - cluster.shutdown(); -} + fail::remove("on_empty_cmd_normal"); + fail::remove("try_flush_data"); -#[test] -fn test_old_compact_log() { - // If we just return None for CompactLog, the region state in ApplyFsm will change. - // Because there is no rollback in new implementation. - // This is a ERROR state. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - cluster.run(); - - // We don't return Persist after handling CompactLog. - fail::cfg("no_persist_compact_log", "return").unwrap(); - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - } + let (compact_index, compact_term) = get_valid_compact_index(&new_state); + let prev_state = new_state; + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = + test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + assert!(!res.get_header().has_error(), "{:?}", res); + + cluster.must_put(b"kz", b"vz"); + check_key(&cluster, b"kz", b"vz", Some(true), None, None); + + // CompactLog is not filtered + let new_state = collect_all_states(&cluster, region_id); + for i in prev_state.keys() { + let old = prev_state.get(i).unwrap(); + let new = new_state.get(i).unwrap(); + assert_ne!( + old.in_memory_apply_state.get_truncated_state(), + new.in_memory_apply_state.get_truncated_state() + ); + assert_eq!( + old.in_memory_apply_state.get_applied_index() + 2, // compact log + (kz,vz) + new.in_memory_apply_state.get_applied_index() + ); + } - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + cluster.shutdown(); } - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - let prev_state = collect_all_states(&cluster, region_id); - let (compact_index, compact_term) = get_valid_compact_index(&prev_state); - let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let res = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - - // Wait for state applys. - std::thread::sleep(std::time::Duration::from_secs(2)); - - let new_state = collect_all_states(&cluster, region_id); - for i in prev_state.keys() { - let old = prev_state.get(i).unwrap(); - let new = new_state.get(i).unwrap(); - assert_ne!( - old.in_memory_apply_state.get_truncated_state(), - new.in_memory_apply_state.get_truncated_state() - ); - assert_eq!( - old.in_disk_apply_state.get_truncated_state(), - new.in_disk_apply_state.get_truncated_state() - ); - } + #[test] + fn test_empty_cmd() { + // Test if a empty command can be observed when leadership changes. + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + // Disable compact log + cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); + cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); + cluster.cfg.raft_store.raft_log_gc_threshold = 1000; - cluster.shutdown(); -} + let _ = cluster.run(); -#[test] -fn test_compact_log() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + cluster.must_put(b"k1", b"v1"); + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); + let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); + cluster.must_transfer_leader(region.get_id(), peer_1.clone()); + std::thread::sleep(std::time::Duration::from_secs(2)); - disable_auto_gen_compact_log(&mut cluster); + check_key(&cluster, b"k1", b"v1", Some(true), None, None); + let prev_states = collect_all_states(&cluster, region_id); - cluster.run(); + // We need forward empty cmd generated by leadership changing to TiFlash. + cluster.must_transfer_leader(region.get_id(), peer_2.clone()); + std::thread::sleep(std::time::Duration::from_secs(2)); - cluster.must_put(b"k", b"v"); - let region = cluster.get_region("k".as_bytes()); - let region_id = region.get_id(); + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term); + } - fail::cfg("on_empty_cmd_normal", "return").unwrap(); - fail::cfg("try_flush_data", "return(0)").unwrap(); - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - cluster.must_put(k.as_bytes(), v.as_bytes()); - } - for i in 0..10 { - let k = format!("k{}", i); - let v = format!("v{}", i); - check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); - } + std::thread::sleep(std::time::Duration::from_secs(2)); + fail::cfg("on_empty_cmd_normal", "return").unwrap(); - std::thread::sleep(std::time::Duration::from_millis(500)); - let prev_state = collect_all_states(&cluster, region_id); - - let (compact_index, compact_term) = get_valid_compact_index(&prev_state); - let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let res = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - // compact index should less than applied index - assert!(!res.get_header().has_error(), "{:?}", res); - - // TODO(tiflash) Make sure compact log is filtered successfully. - // Can be abstract to a retry function. - std::thread::sleep(std::time::Duration::from_millis(500)); - - // CompactLog is filtered, because we can't flush data. - // However, we can still observe apply index advanced - let new_state = collect_all_states(&cluster, region_id); - for i in prev_state.keys() { - let old = prev_state.get(i).unwrap(); - let new = new_state.get(i).unwrap(); - assert_eq!( - old.in_memory_apply_state.get_truncated_state(), - new.in_memory_apply_state.get_truncated_state() - ); - assert_eq!( - old.in_disk_apply_state.get_truncated_state(), - new.in_disk_apply_state.get_truncated_state() - ); - assert_eq!( - old.in_memory_apply_state.get_applied_index() + 1, - new.in_memory_apply_state.get_applied_index() - ); - // Persist is before. - assert_eq!( - old.in_disk_apply_state.get_applied_index(), - new.in_disk_apply_state.get_applied_index() - ); - } + let prev_states = new_states; + cluster.must_transfer_leader(region.get_id(), peer_1.clone()); + std::thread::sleep(std::time::Duration::from_secs(2)); - fail::remove("on_empty_cmd_normal"); - fail::remove("try_flush_data"); - - let (compact_index, compact_term) = get_valid_compact_index(&new_state); - let prev_state = new_state; - let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); - let req = test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log); - let res = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - assert!(!res.get_header().has_error(), "{:?}", res); - - cluster.must_put(b"kz", b"vz"); - check_key(&cluster, b"kz", b"vz", Some(true), None, None); - - // CompactLog is not filtered - let new_state = collect_all_states(&cluster, region_id); - for i in prev_state.keys() { - let old = prev_state.get(i).unwrap(); - let new = new_state.get(i).unwrap(); - assert_ne!( - old.in_memory_apply_state.get_truncated_state(), - new.in_memory_apply_state.get_truncated_state() - ); - assert_eq!( - old.in_memory_apply_state.get_applied_index() + 2, // compact log + (kz,vz) - new.in_memory_apply_state.get_applied_index() - ); - } - - cluster.shutdown(); -} - -// TODO(tiflash) Test a KV will not be write twice by not only handle_put but also observer. When we fully enable engine_tiflash. + let new_states = collect_all_states(&cluster, region_id); + for i in prev_states.keys() { + let old = prev_states.get(i).unwrap(); + let new = new_states.get(i).unwrap(); + assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state); + assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); + } -pub fn new_ingest_sst_cmd(meta: SstMeta) -> Request { - let mut cmd = Request::default(); - cmd.set_cmd_type(CmdType::IngestSst); - cmd.mut_ingest_sst().set_sst(meta); - cmd -} + fail::remove("on_empty_cmd_normal"); -pub fn create_tmp_importer(cfg: &Config, kv_path: &str) -> (PathBuf, Arc) { - let dir = Path::new(kv_path).join("import-sst"); - let importer = { - Arc::new( - SstImporter::new(&cfg.import, dir.clone(), None, cfg.storage.api_version()).unwrap(), - ) - }; - (dir, importer) + cluster.shutdown(); + } } mod ingest { @@ -724,6 +841,23 @@ mod ingest { use super::*; + pub fn new_ingest_sst_cmd(meta: SstMeta) -> Request { + let mut cmd = Request::default(); + cmd.set_cmd_type(CmdType::IngestSst); + cmd.mut_ingest_sst().set_sst(meta); + cmd + } + + pub fn create_tmp_importer(cfg: &Config, kv_path: &str) -> (PathBuf, Arc) { + let dir = Path::new(kv_path).join("import-sst"); + let importer = { + Arc::new( + SstImporter::new(&cfg.import, dir.clone(), None, cfg.storage.api_version()) + .unwrap(), + ) + }; + (dir, importer) + } fn make_sst( cluster: &Cluster, region_id: u64, @@ -935,123 +1069,6 @@ mod ingest { } } -#[test] -fn test_handle_destroy() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - - // Disable raft log gc in this test case. - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); - - // Disable default max peer count check. - pd_client.disable_default_operator(); - - cluster.run(); - cluster.must_put(b"k1", b"v1"); - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); - let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); - cluster.must_transfer_leader(region_id, peer_1); - - iter_ffi_helpers( - &cluster, - Some(vec![eng_ids[1]]), - &mut |_, _, ffi: &mut FFIHelperSet| { - let server = &ffi.engine_store_server; - assert!(server.kvstore.contains_key(®ion_id)); - }, - ); - - pd_client.must_remove_peer(region_id, peer_2); - - check_key( - &cluster, - b"k1", - b"k2", - Some(false), - None, - Some(vec![eng_ids[1]]), - ); - - // Region removed in server. - iter_ffi_helpers( - &cluster, - Some(vec![eng_ids[1]]), - &mut |_, _, ffi: &mut FFIHelperSet| { - let server = &ffi.engine_store_server; - assert!(!server.kvstore.contains_key(®ion_id)); - }, - ); - - cluster.shutdown(); -} - -#[test] -fn test_empty_cmd() { - // Test if a empty command can be observed when leadership changes. - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - // Disable compact log - cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); - cluster.cfg.raft_store.raft_log_gc_threshold = 1000; - - let _ = cluster.run(); - - cluster.must_put(b"k1", b"v1"); - let region = cluster.get_region(b"k1"); - let region_id = region.get_id(); - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); - let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); - cluster.must_transfer_leader(region.get_id(), peer_1.clone()); - std::thread::sleep(std::time::Duration::from_secs(2)); - - check_key(&cluster, b"k1", b"v1", Some(true), None, None); - let prev_states = collect_all_states(&cluster, region_id); - - // We need forward empty cmd generated by leadership changing to TiFlash. - cluster.must_transfer_leader(region.get_id(), peer_2.clone()); - std::thread::sleep(std::time::Duration::from_secs(2)); - - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_ne!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_ne!(old.in_memory_applied_term, new.in_memory_applied_term); - } - - std::thread::sleep(std::time::Duration::from_secs(2)); - fail::cfg("on_empty_cmd_normal", "return").unwrap(); - - let prev_states = new_states; - cluster.must_transfer_leader(region.get_id(), peer_1.clone()); - std::thread::sleep(std::time::Duration::from_secs(2)); - - let new_states = collect_all_states(&cluster, region_id); - for i in prev_states.keys() { - let old = prev_states.get(i).unwrap(); - let new = new_states.get(i).unwrap(); - assert_eq!(old.in_memory_apply_state, new.in_memory_apply_state); - assert_eq!(old.in_memory_applied_term, new.in_memory_applied_term); - } - - fail::remove("on_empty_cmd_normal"); - - cluster.shutdown(); -} - #[test] fn test_restart() { // Test if a empty command can be observed when leadership changes. @@ -1079,348 +1096,358 @@ fn test_restart() { .collect::>(); } -#[test] -fn test_huge_snapshot() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); +mod snapshot { + use super::*; - fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); - cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(500); - - // Disable default max peer count check. - pd_client.disable_default_operator(); - let r1 = cluster.run_conf_change(); - - let first_value = vec![0; 10240]; - // at least 4m data - for i in 0..400 { - let key = format!("{:03}", i); - cluster.must_put(key.as_bytes(), &first_value); - } - let first_key: &[u8] = b"000"; + #[test] + fn test_huge_snapshot() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); + cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10); + cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(500); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + let first_value = vec![0; 10240]; + // at least 4m data + for i in 0..400 { + let key = format!("{:03}", i); + cluster.must_put(key.as_bytes(), &first_value); + } + let first_key: &[u8] = b"000"; - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - tikv_util::info!("engine_2 is {}", eng_ids[1]); - let engine_2 = cluster.get_engine(eng_ids[1]); - must_get_none(&engine_2, first_key); - // add peer (engine_2,engine_2) to region 1. - pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1])); - - let (key, value) = (b"k2", b"v2"); - cluster.must_put(key, value); - // we can get in memory, since snapshot is pre handled, though it is not persisted - check_key( - &cluster, - key, - value, - Some(true), - None, - Some(vec![eng_ids[1]]), - ); - // now snapshot must be applied on peer engine_2 - must_get_equal(&engine_2, first_key, first_value.as_slice()); - - fail::cfg("on_ob_post_apply_snapshot", "return").unwrap(); - - tikv_util::info!("engine_3 is {}", eng_ids[2]); - let engine_3 = cluster.get_engine(eng_ids[2]); - must_get_none(&engine_3, first_key); - pd_client.must_add_peer(r1, new_peer(eng_ids[2], eng_ids[2])); - - let (key, value) = (b"k3", b"v3"); - cluster.must_put(key, value); - check_key(&cluster, key, value, Some(true), None, None); - // We have not persist pre handled snapshot, - // we can't be sure if it exists in only get from memory too, since pre handle snapshot is async. - must_get_none(&engine_3, first_key); - - fail::remove("on_ob_post_apply_snapshot"); - fail::remove("on_can_apply_snapshot"); - - cluster.shutdown(); -} + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + tikv_util::info!("engine_2 is {}", eng_ids[1]); + let engine_2 = cluster.get_engine(eng_ids[1]); + must_get_none(&engine_2, first_key); + // add peer (engine_2,engine_2) to region 1. + pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1])); + + let (key, value) = (b"k2", b"v2"); + cluster.must_put(key, value); + // we can get in memory, since snapshot is pre handled, though it is not persisted + check_key( + &cluster, + key, + value, + Some(true), + None, + Some(vec![eng_ids[1]]), + ); + // now snapshot must be applied on peer engine_2 + must_get_equal(&engine_2, first_key, first_value.as_slice()); -#[test] -fn test_concurrent_snapshot() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + fail::cfg("on_ob_post_apply_snapshot", "return").unwrap(); + + tikv_util::info!("engine_3 is {}", eng_ids[2]); + let engine_3 = cluster.get_engine(eng_ids[2]); + must_get_none(&engine_3, first_key); + pd_client.must_add_peer(r1, new_peer(eng_ids[2], eng_ids[2])); - // Disable raft log gc in this test case. - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); - - // Disable default max peer count check. - pd_client.disable_default_operator(); - - let r1 = cluster.run_conf_change(); - cluster.must_put(b"k1", b"v1"); - pd_client.must_add_peer(r1, new_peer(2, 2)); - // Force peer 2 to be followers all the way. - cluster.add_send_filter(CloneFilterFactory( - RegionPacketFilter::new(r1, 2) - .msg_type(MessageType::MsgRequestVote) - .direction(Direction::Send), - )); - cluster.must_transfer_leader(r1, new_peer(1, 1)); - cluster.must_put(b"k3", b"v3"); - // Pile up snapshots of overlapped region ranges and deliver them all at once. - let (tx, rx) = mpsc::channel(); - cluster - .sim - .wl() - .add_recv_filter(3, Box::new(CollectSnapshotFilter::new(tx))); - pd_client.must_add_peer(r1, new_peer(3, 3)); - let region = cluster.get_region(b"k1"); - // Ensure the snapshot of range ("", "") is sent and piled in filter. - if let Err(e) = rx.recv_timeout(Duration::from_secs(1)) { - panic!("the snapshot is not sent before split, e: {:?}", e); + let (key, value) = (b"k3", b"v3"); + cluster.must_put(key, value); + check_key(&cluster, key, value, Some(true), None, None); + // We have not persist pre handled snapshot, + // we can't be sure if it exists in only get from memory too, since pre handle snapshot is async. + must_get_none(&engine_3, first_key); + + fail::remove("on_ob_post_apply_snapshot"); + fail::remove("on_can_apply_snapshot"); + + cluster.shutdown(); } - // Split the region range and then there should be another snapshot for the split ranges. - cluster.must_split(®ion, b"k2"); - must_get_equal(&cluster.get_engine(3), b"k3", b"v3"); + #[test] + fn test_concurrent_snapshot() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - // Ensure the regions work after split. - cluster.must_put(b"k11", b"v11"); - check_key(&cluster, b"k11", b"v11", Some(true), None, Some(vec![3])); - cluster.must_put(b"k4", b"v4"); - check_key(&cluster, b"k4", b"v4", Some(true), None, Some(vec![3])); + // Disable raft log gc in this test case. + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); - cluster.shutdown(); -} + // Disable default max peer count check. + pd_client.disable_default_operator(); -fn new_split_region_cluster(count: u64) -> (Cluster, Arc) { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); - // Disable raft log gc in this test case. - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); - // Disable default max peer count check. - pd_client.disable_default_operator(); - - let r1 = cluster.run_conf_change(); - for i in 0..count { - let k = format!("k{:0>4}", 2 * i + 1); - let v = format!("v{}", 2 * i + 1); - cluster.must_put(k.as_bytes(), v.as_bytes()); + let r1 = cluster.run_conf_change(); + cluster.must_put(b"k1", b"v1"); + pd_client.must_add_peer(r1, new_peer(2, 2)); + // Force peer 2 to be followers all the way. + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(r1, 2) + .msg_type(MessageType::MsgRequestVote) + .direction(Direction::Send), + )); + cluster.must_transfer_leader(r1, new_peer(1, 1)); + cluster.must_put(b"k3", b"v3"); + // Pile up snapshots of overlapped region ranges and deliver them all at once. + let (tx, rx) = mpsc::channel(); + cluster + .sim + .wl() + .add_recv_filter(3, Box::new(CollectSnapshotFilter::new(tx))); + pd_client.must_add_peer(r1, new_peer(3, 3)); + let region = cluster.get_region(b"k1"); + // Ensure the snapshot of range ("", "") is sent and piled in filter. + if let Err(e) = rx.recv_timeout(Duration::from_secs(1)) { + panic!("the snapshot is not sent before split, e: {:?}", e); + } + + // Split the region range and then there should be another snapshot for the split ranges. + cluster.must_split(®ion, b"k2"); + must_get_equal(&cluster.get_engine(3), b"k3", b"v3"); + + // Ensure the regions work after split. + cluster.must_put(b"k11", b"v11"); + check_key(&cluster, b"k11", b"v11", Some(true), None, Some(vec![3])); + cluster.must_put(b"k4", b"v4"); + check_key(&cluster, b"k4", b"v4", Some(true), None, Some(vec![3])); + + cluster.shutdown(); } - // k1 in [ , ] splited by k2 -> (, k2] [k2, ) - // k3 in [k2, ) splited by k4 -> [k2, k4) [k4, ) - for i in 0..count { - let k = format!("k{:0>4}", 2 * i + 1); - let region = cluster.get_region(k.as_bytes()); - let sp = format!("k{:0>4}", 2 * i + 2); - cluster.must_split(®ion, sp.as_bytes()); - let region = cluster.get_region(k.as_bytes()); + fn new_split_region_cluster(count: u64) -> (Cluster, Arc) { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + // Disable raft log gc in this test case. + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); + // Disable default max peer count check. + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + for i in 0..count { + let k = format!("k{:0>4}", 2 * i + 1); + let v = format!("v{}", 2 * i + 1); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + + // k1 in [ , ] splited by k2 -> (, k2] [k2, ) + // k3 in [k2, ) splited by k4 -> [k2, k4) [k4, ) + for i in 0..count { + let k = format!("k{:0>4}", 2 * i + 1); + let region = cluster.get_region(k.as_bytes()); + let sp = format!("k{:0>4}", 2 * i + 2); + cluster.must_split(®ion, sp.as_bytes()); + let region = cluster.get_region(k.as_bytes()); + } + + (cluster, pd_client) } - (cluster, pd_client) -} + #[test] + fn test_prehandle_fail() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); -#[test] -fn test_prehandle_fail() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + // Disable raft log gc in this test case. + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); - // Disable raft log gc in this test case. - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); + // Disable default max peer count check. + pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + cluster.must_put(b"k1", b"v1"); - // Disable default max peer count check. - pd_client.disable_default_operator(); - let r1 = cluster.run_conf_change(); - cluster.must_put(b"k1", b"v1"); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + // If we fail to call pre-handle snapshot, we can still handle it when apply snapshot. + fail::cfg("before_actually_pre_handle", "return").unwrap(); + pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1])); + check_key( + &cluster, + b"k1", + b"v1", + Some(true), + Some(true), + Some(vec![eng_ids[1]]), + ); + fail::remove("before_actually_pre_handle"); - let eng_ids = cluster - .engines - .iter() - .map(|e| e.0.to_owned()) - .collect::>(); - // If we fail to call pre-handle snapshot, we can still handle it when apply snapshot. - fail::cfg("before_actually_pre_handle", "return").unwrap(); - pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1])); - check_key( - &cluster, - b"k1", - b"v1", - Some(true), - Some(true), - Some(vec![eng_ids[1]]), - ); - fail::remove("before_actually_pre_handle"); - - // If we failed in apply snapshot(not panic), even if per_handle_snapshot is not called. - fail::cfg("on_ob_pre_handle_snapshot", "return").unwrap(); - check_key( - &cluster, - b"k1", - b"v1", - Some(false), - Some(false), - Some(vec![eng_ids[2]]), - ); - pd_client.must_add_peer(r1, new_peer(eng_ids[2], eng_ids[2])); - check_key( - &cluster, - b"k1", - b"v1", - Some(true), - Some(true), - Some(vec![eng_ids[2]]), - ); - fail::remove("on_ob_pre_handle_snapshot"); - - cluster.shutdown(); -} + // If we failed in apply snapshot(not panic), even if per_handle_snapshot is not called. + fail::cfg("on_ob_pre_handle_snapshot", "return").unwrap(); + check_key( + &cluster, + b"k1", + b"v1", + Some(false), + Some(false), + Some(vec![eng_ids[2]]), + ); + pd_client.must_add_peer(r1, new_peer(eng_ids[2], eng_ids[2])); + check_key( + &cluster, + b"k1", + b"v1", + Some(true), + Some(true), + Some(vec![eng_ids[2]]), + ); + fail::remove("on_ob_pre_handle_snapshot"); -#[test] -fn test_split_merge() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + cluster.shutdown(); + } - // Can always apply snapshot immediately - fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); - cluster.cfg.raft_store.right_derive_when_split = true; + #[test] + fn test_split_merge() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - // May fail if cluster.start, since node 2 is not in region1.peers(), - // and node 2 has not bootstrap region1, - // because region1 is not bootstrap if we only call cluster.start() - cluster.run(); + // Can always apply snapshot immediately + fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); + cluster.cfg.raft_store.right_derive_when_split = true; - cluster.must_put(b"k1", b"v1"); - cluster.must_put(b"k3", b"v3"); + // May fail if cluster.start, since node 2 is not in region1.peers(), + // and node 2 has not bootstrap region1, + // because region1 is not bootstrap if we only call cluster.start() + cluster.run(); - check_key(&cluster, b"k1", b"v1", Some(true), None, None); - check_key(&cluster, b"k3", b"v3", Some(true), None, None); + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); - let r1 = cluster.get_region(b"k1"); - let r3 = cluster.get_region(b"k3"); - assert_eq!(r1.get_id(), r3.get_id()); + check_key(&cluster, b"k1", b"v1", Some(true), None, None); + check_key(&cluster, b"k3", b"v3", Some(true), None, None); - cluster.must_split(&r1, b"k2"); - let r1_new = cluster.get_region(b"k1"); - let r3_new = cluster.get_region(b"k3"); + let r1 = cluster.get_region(b"k1"); + let r3 = cluster.get_region(b"k3"); + assert_eq!(r1.get_id(), r3.get_id()); - assert_eq!(r1.get_id(), r3_new.get_id()); + cluster.must_split(&r1, b"k2"); + let r1_new = cluster.get_region(b"k1"); + let r3_new = cluster.get_region(b"k3"); - iter_ffi_helpers(&cluster, None, &mut |id: u64, _, ffi: &mut FFIHelperSet| { - let server = &ffi.engine_store_server; - if !server.kvstore.contains_key(&r1_new.get_id()) { - panic!("node {} has no region {}", id, r1_new.get_id()) - } - if !server.kvstore.contains_key(&r3_new.get_id()) { - panic!("node {} has no region {}", id, r3_new.get_id()) - } - // Region meta must equal - assert_eq!(server.kvstore.get(&r1_new.get_id()).unwrap().region, r1_new); - assert_eq!(server.kvstore.get(&r3_new.get_id()).unwrap().region, r3_new); - - // Can get from disk - check_key(&cluster, b"k1", b"v1", None, Some(true), None); - check_key(&cluster, b"k3", b"v3", None, Some(true), None); - // TODO Region in memory data must not contradict, but now we do not delete data - }); - - pd_client.must_merge(r1_new.get_id(), r3_new.get_id()); - let r1_new2 = cluster.get_region(b"k1"); - let r3_new2 = cluster.get_region(b"k3"); - - iter_ffi_helpers(&cluster, None, &mut |id: u64, _, ffi: &mut FFIHelperSet| { - let server = &ffi.engine_store_server; - - // The left region is removed - if server.kvstore.contains_key(&r1_new.get_id()) { - panic!("node {} should has no region {}", id, r1_new.get_id()) - } - if !server.kvstore.contains_key(&r3_new.get_id()) { - panic!("node {} has no region {}", id, r3_new.get_id()) - } - // Region meta must equal - assert_eq!( - server.kvstore.get(&r3_new2.get_id()).unwrap().region, - r3_new2 - ); + assert_eq!(r1.get_id(), r3_new.get_id()); - // Can get from disk - check_key(&cluster, b"k1", b"v1", None, Some(true), None); - check_key(&cluster, b"k3", b"v3", None, Some(true), None); - // TODO Region in memory data must not contradict, but now we do not delete data + iter_ffi_helpers(&cluster, None, &mut |id: u64, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + if !server.kvstore.contains_key(&r1_new.get_id()) { + panic!("node {} has no region {}", id, r1_new.get_id()) + } + if !server.kvstore.contains_key(&r3_new.get_id()) { + panic!("node {} has no region {}", id, r3_new.get_id()) + } + // Region meta must equal + assert_eq!(server.kvstore.get(&r1_new.get_id()).unwrap().region, r1_new); + assert_eq!(server.kvstore.get(&r3_new.get_id()).unwrap().region, r3_new); - let origin_epoch = r3_new.get_region_epoch(); - let new_epoch = r3_new2.get_region_epoch(); - // PrepareMerge + CommitMerge, so it should be 2. - assert_eq!(new_epoch.get_version(), origin_epoch.get_version() + 2); - assert_eq!(new_epoch.get_conf_ver(), origin_epoch.get_conf_ver()); - }); + // Can get from disk + check_key(&cluster, b"k1", b"v1", None, Some(true), None); + check_key(&cluster, b"k3", b"v3", None, Some(true), None); + // TODO Region in memory data must not contradict, but now we do not delete data + }); - fail::remove("on_can_apply_snapshot"); - cluster.shutdown(); -} + pd_client.must_merge(r1_new.get_id(), r3_new.get_id()); + let r1_new2 = cluster.get_region(b"k1"); + let r3_new2 = cluster.get_region(b"k3"); -#[test] -fn test_basic_concurrent_snapshot() { - let (mut cluster, pd_client) = new_mock_cluster(0, 3); + iter_ffi_helpers(&cluster, None, &mut |id: u64, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; - disable_auto_gen_compact_log(&mut cluster); - assert_eq!(cluster.cfg.tikv.raft_store.snap_handle_pool_size, 2); - assert_eq!(cluster.cfg.proxy_cfg.raft_store.snap_handle_pool_size, 2); + // The left region is removed + if server.kvstore.contains_key(&r1_new.get_id()) { + panic!("node {} should has no region {}", id, r1_new.get_id()) + } + if !server.kvstore.contains_key(&r3_new.get_id()) { + panic!("node {} has no region {}", id, r3_new.get_id()) + } + // Region meta must equal + assert_eq!( + server.kvstore.get(&r3_new2.get_id()).unwrap().region, + r3_new2 + ); + + // Can get from disk + check_key(&cluster, b"k1", b"v1", None, Some(true), None); + check_key(&cluster, b"k3", b"v3", None, Some(true), None); + // TODO Region in memory data must not contradict, but now we do not delete data + + let origin_epoch = r3_new.get_region_epoch(); + let new_epoch = r3_new2.get_region_epoch(); + // PrepareMerge + CommitMerge, so it should be 2. + assert_eq!(new_epoch.get_version(), origin_epoch.get_version() + 2); + assert_eq!(new_epoch.get_conf_ver(), origin_epoch.get_conf_ver()); + }); + + fail::remove("on_can_apply_snapshot"); + cluster.shutdown(); + } - // Disable default max peer count check. - pd_client.disable_default_operator(); + #[test] + fn test_basic_concurrent_snapshot() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); - let r1 = cluster.run_conf_change(); - cluster.must_put(b"k1", b"v1"); - cluster.must_put(b"k3", b"v3"); + disable_auto_gen_compact_log(&mut cluster); + assert_eq!(cluster.cfg.tikv.raft_store.snap_handle_pool_size, 2); + assert_eq!(cluster.cfg.proxy_cfg.raft_store.snap_handle_pool_size, 2); - let region1 = cluster.get_region(b"k1"); - cluster.must_split(®ion1, b"k2"); - let r1 = cluster.get_region(b"k1").get_id(); - let r3 = cluster.get_region(b"k3").get_id(); + // Disable default max peer count check. + pd_client.disable_default_operator(); - fail::cfg("before_actually_pre_handle", "sleep(1000)").unwrap(); - tikv_util::info!("region k1 {} k3 {}", r1, r3); - pd_client.add_peer(r1, new_peer(2, 2)); - pd_client.add_peer(r3, new_peer(2, 2)); - std::thread::sleep(std::time::Duration::from_millis(500)); - // Now, k1 and k3 are not handled, since pre-handle process is not finished. + let r1 = cluster.run_conf_change(); + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); - let pending_count = cluster - .engines - .get(&2) - .unwrap() - .kv - .pending_applies_count - .clone(); - assert_eq!(pending_count.load(Ordering::Relaxed), 2); - std::thread::sleep(std::time::Duration::from_millis(1000)); - // Now, k1 and k3 are handled. - assert_eq!(pending_count.load(Ordering::Relaxed), 0); - - fail::remove("before_actually_pre_handle"); - - cluster.shutdown(); -} + let region1 = cluster.get_region(b"k1"); + cluster.must_split(®ion1, b"k2"); + let r1 = cluster.get_region(b"k1").get_id(); + let r3 = cluster.get_region(b"k3").get_id(); + + fail::cfg("before_actually_pre_handle", "sleep(1000)").unwrap(); + tikv_util::info!("region k1 {} k3 {}", r1, r3); + pd_client.add_peer(r1, new_peer(2, 2)); + pd_client.add_peer(r3, new_peer(2, 2)); + std::thread::sleep(std::time::Duration::from_millis(500)); + // Now, k1 and k3 are not handled, since pre-handle process is not finished. + // This is because `pending_applies_count` is not greater than `snap_handle_pool_size`, + // So there are no `handle_pending_applies` until `on_timeout`. + + let pending_count = cluster + .engines + .get(&2) + .unwrap() + .kv + .pending_applies_count + .clone(); + assert_eq!(pending_count.load(Ordering::SeqCst), 2); + std::thread::sleep(std::time::Duration::from_millis(500)); + check_key(&cluster, b"k1", b"v1", None, Some(true), Some(vec![1, 2])); + check_key(&cluster, b"k3", b"v3", None, Some(true), Some(vec![1, 2])); + // Now, k1 and k3 are handled. + assert_eq!(pending_count.load(Ordering::SeqCst), 0); + + fail::remove("before_actually_pre_handle"); -#[test] -fn test_many_concurrent_snapshot() { - let c = 4; - let (cluster, pd_client) = new_split_region_cluster(c); - - for i in 0..c { - let k = format!("k{:0>4}", 2 * i + 1); - let region_id = cluster.get_region(k.as_bytes()).get_id(); - pd_client.must_add_peer(region_id, new_peer(2, 2)); + cluster.shutdown(); } - for i in 0..c { - let k = format!("k{:0>4}", 2 * i + 1); - let v = format!("v{}", 2 * i + 1); - check_key( - &cluster, - k.as_bytes(), - v.as_bytes(), - Some(true), - Some(true), - Some(vec![2]), - ); + #[test] + fn test_many_concurrent_snapshot() { + let c = 4; + let (mut cluster, pd_client) = new_split_region_cluster(c); + + for i in 0..c { + let k = format!("k{:0>4}", 2 * i + 1); + let region_id = cluster.get_region(k.as_bytes()).get_id(); + pd_client.must_add_peer(region_id, new_peer(2, 2)); + } + + for i in 0..c { + let k = format!("k{:0>4}", 2 * i + 1); + let v = format!("v{}", 2 * i + 1); + check_key( + &cluster, + k.as_bytes(), + v.as_bytes(), + Some(true), + Some(true), + Some(vec![2]), + ); + } + + cluster.shutdown(); } }