diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index bd65c37b9b7..f8b48031816 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -537,15 +537,16 @@ impl CoprocessorHost { peer_id: u64, snap_key: &crate::store::SnapKey, snap: Option<&crate::store::Snapshot>, - ) { - loop_ob!( - region, - &self.registry.apply_snapshot_observers, - post_apply_snapshot, - peer_id, - snap_key, - snap, - ); + ) -> Result<()> { + let mut ctx = ObserverContext::new(region); + for observer in &self.registry.apply_snapshot_observers { + let observer = observer.observer.inner(); + let res = observer.post_apply_snapshot(&mut ctx, peer_id, snap_key, snap); + if res.is_err() { + return res; + } + } + Ok(()) } pub fn new_split_checker_host<'a>( @@ -909,10 +910,11 @@ mod tests { _: u64, _: &crate::store::SnapKey, _: Option<&Snapshot>, - ) { + ) -> Result<()> { self.called .fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); + Ok(()) } fn should_pre_apply_snapshot(&self) -> bool { @@ -1072,7 +1074,7 @@ mod tests { index += ObserverIndex::PreApplySnapshot as usize; assert_all!([&ob.called], &[index]); - host.post_apply_snapshot(®ion, 0, &key, None); + let _ = host.post_apply_snapshot(®ion, 0, &key, None); index += ObserverIndex::PostApplySnapshot as usize; assert_all!([&ob.called], &[index]); diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 0093e2e473f..9f657d9a3a7 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole}; pub mod config; mod consistency_check; pub mod dispatcher; -mod error; +pub mod error; mod metrics; pub mod region_info_accessor; mod split_check; @@ -198,7 +198,8 @@ pub trait ApplySnapshotObserver: Coprocessor { _: u64, _: &crate::store::SnapKey, _snapshot: Option<&crate::store::Snapshot>, - ) { + ) -> Result<()> { + Ok(()) } /// We call pre_apply_snapshot only when one of the observer returns true. diff --git a/components/raftstore/src/engine_store_ffi/observer.rs b/components/raftstore/src/engine_store_ffi/observer.rs index 31621b1bf94..b686499b340 100644 --- a/components/raftstore/src/engine_store_ffi/observer.rs +++ b/components/raftstore/src/engine_store_ffi/observer.rs @@ -28,6 +28,7 @@ use yatp::{ }; use crate::{ + coprocessor, coprocessor::{ AdminObserver, ApplyCtxInfo, ApplySnapshotObserver, BoxAdminObserver, BoxApplySnapshotObserver, BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver, @@ -698,15 +699,17 @@ impl ApplySnapshotObserver for TiFlashObserver { peer_id: u64, snap_key: &crate::store::SnapKey, snap: Option<&crate::store::Snapshot>, - ) { - fail::fail_point!("on_ob_post_apply_snapshot", |_| {}); + ) -> std::result::Result<(), coprocessor::error::Error> { + fail::fail_point!("on_ob_post_apply_snapshot", |_| { + return Err(box_err!("on_ob_post_apply_snapshot")); + }); info!("post apply snapshot"; "peer_id" => ?peer_id, "snap_key" => ?snap_key, "region" => ?ob_ctx.region(), ); let snap = match snap { - None => return, + None => return Ok(()), Some(s) => s, }; let maybe_snapshot = { @@ -767,6 +770,7 @@ impl ApplySnapshotObserver for TiFlashObserver { self.engine_store_server_helper .apply_pre_handled_snapshot(ptr.0); } + Ok(()) } fn should_pre_apply_snapshot(&self) -> bool { diff --git a/components/raftstore/src/store/worker/region.rs b/components/raftstore/src/store/worker/region.rs index 753176b7deb..7d14778c63e 100644 --- a/components/raftstore/src/store/worker/region.rs +++ b/components/raftstore/src/store/worker/region.rs @@ -420,8 +420,15 @@ where coprocessor_host: self.coprocessor_host.clone(), }; s.apply(options)?; - self.coprocessor_host - .post_apply_snapshot(®ion, peer_id, &snap_key, Some(&s)); + match self + .coprocessor_host + .post_apply_snapshot(®ion, peer_id, &snap_key, Some(&s)) + { + Ok(_) => (), + Err(e) => { + return Err(box_err!("post apply snapshot error {:?}", e)); + } + }; let mut wb = self.engine.write_batch(); region_state.set_state(PeerState::Normal); @@ -1412,7 +1419,7 @@ mod tests { peer_id: u64, key: &crate::store::SnapKey, snapshot: Option<&crate::store::Snapshot>, - ) { + ) -> std::result::Result<(), crate::coprocessor::error::Error> { let code = snapshot.unwrap().total_size().unwrap() + key.term + key.region_id @@ -1421,6 +1428,7 @@ mod tests { self.post_apply_count.fetch_add(1, Ordering::SeqCst); self.post_apply_hash .fetch_add(code as usize, Ordering::SeqCst); + Ok(()) } fn should_pre_apply_snapshot(&self) -> bool { diff --git a/new-mock-engine-store/src/lib.rs b/new-mock-engine-store/src/lib.rs index 3397e7ad87a..e1f074ec835 100644 --- a/new-mock-engine-store/src/lib.rs +++ b/new-mock-engine-store/src/lib.rs @@ -235,17 +235,11 @@ unsafe fn load_from_db(store: &mut EngineStoreServer, region_id: u64) { "region_id" => region_id, "cf" => cf, "k" => ?k, - "v" => ?v, ); Ok(true) }) .unwrap(); } - debug!("after restore"; - "store" => store_id, - "region_id" => region_id, - "default size" => region.data[2].len(), - ); } unsafe fn write_to_db_data( diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 8d9e64f65e4..aa494664485 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -1157,16 +1157,100 @@ mod ingest { mod restart { use super::*; #[test] - fn test_restart() { + fn test_snap_restart() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); + disable_auto_gen_compact_log(&mut cluster); + cluster.cfg.raft_store.max_snapshot_file_raw_size = ReadableSize(u64::MAX); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + let first_value = vec![0; 10240]; + for i in 0..10 { + let key = format!("{:03}", i); + cluster.must_put(key.as_bytes(), &first_value); + } + let first_key: &[u8] = b"000"; + + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + tikv_util::info!("engine_2 is {}", eng_ids[1]); + // engine 2 will not exec post apply snapshot. + fail::cfg("on_ob_pre_handle_snapshot", "return").unwrap(); + fail::cfg("on_ob_post_apply_snapshot", "return").unwrap(); + + let engine_2 = cluster.get_engine(eng_ids[1]); + must_get_none(&engine_2, first_key); + // add peer (engine_2,engine_2) to region 1. + pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1])); + + check_key(&cluster, first_key, &first_value, Some(false), None, None); + + info!("stop node {}", eng_ids[1]); + cluster.stop_node(eng_ids[1]); + { + let lock = cluster.ffi_helper_set.lock(); + lock.unwrap() + .deref_mut() + .get_mut(&eng_ids[1]) + .unwrap() + .engine_store_server + .stop(); + } + + fail::remove("on_ob_pre_handle_snapshot"); + fail::remove("on_ob_post_apply_snapshot"); + info!("resume node {}", eng_ids[1]); + { + let lock = cluster.ffi_helper_set.lock(); + lock.unwrap() + .deref_mut() + .get_mut(&eng_ids[1]) + .unwrap() + .engine_store_server + .restore(); + } + info!("restored node {}", eng_ids[1]); + cluster.run_node(eng_ids[1]).unwrap(); + + let (key, value) = (b"k2", b"v2"); + cluster.must_put(key, value); + // we can get in memory, since snapshot is pre handled, though it is not persisted + check_key( + &cluster, + key, + value, + Some(true), + None, + Some(vec![eng_ids[1]]), + ); + // now snapshot must be applied on peer engine_2 + check_key( + &cluster, + first_key, + first_value.as_slice(), + Some(true), + None, + Some(vec![eng_ids[1]]), + ); + + cluster.shutdown(); + } + + #[test] + fn test_kv_restart() { // Test if a empty command can be observed when leadership changes. let (mut cluster, pd_client) = new_mock_cluster(0, 3); // Disable AUTO generated compact log. - // This will not totally disable, so we use some failpoints later. - cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000); - cluster.cfg.raft_store.raft_log_gc_threshold = 1000; + disable_auto_gen_compact_log(&mut cluster); // We don't handle CompactLog at all. fail::cfg("try_flush_data", "return(0)").unwrap(); @@ -1205,7 +1289,7 @@ mod restart { k.as_bytes(), v.as_bytes(), Some(true), - None, + Some(true), Some(vec![eng_ids[0]]), ); } @@ -1225,7 +1309,7 @@ mod restart { k.as_bytes(), v.as_bytes(), Some(true), - None, + Some(false), Some(vec![eng_ids[0]]), ); } @@ -1284,9 +1368,7 @@ mod snapshot { let (mut cluster, pd_client) = new_mock_cluster(0, 3); fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); - cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000); - cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10); - cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(500); + disable_auto_gen_compact_log(&mut cluster); cluster.cfg.raft_store.max_snapshot_file_raw_size = ReadableSize(u64::MAX); // Disable default max peer count check.