Skip to content

Commit

Permalink
add test_snap_restart
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Aug 30, 2022
1 parent 1e32c1d commit 12c105d
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 36 deletions.
24 changes: 13 additions & 11 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,15 +537,16 @@ impl<E: KvEngine> CoprocessorHost<E> {
peer_id: u64,
snap_key: &crate::store::SnapKey,
snap: Option<&crate::store::Snapshot>,
) {
loop_ob!(
region,
&self.registry.apply_snapshot_observers,
post_apply_snapshot,
peer_id,
snap_key,
snap,
);
) -> Result<()> {
let mut ctx = ObserverContext::new(region);
for observer in &self.registry.apply_snapshot_observers {
let observer = observer.observer.inner();
let res = observer.post_apply_snapshot(&mut ctx, peer_id, snap_key, snap);
if res.is_err() {
return res;
}
}
Ok(())
}

pub fn new_split_checker_host<'a>(
Expand Down Expand Up @@ -909,10 +910,11 @@ mod tests {
_: u64,
_: &crate::store::SnapKey,
_: Option<&Snapshot>,
) {
) -> Result<()> {
self.called
.fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
Ok(())
}

fn should_pre_apply_snapshot(&self) -> bool {
Expand Down Expand Up @@ -1072,7 +1074,7 @@ mod tests {
index += ObserverIndex::PreApplySnapshot as usize;
assert_all!([&ob.called], &[index]);

host.post_apply_snapshot(&region, 0, &key, None);
let _ = host.post_apply_snapshot(&region, 0, &key, None);
index += ObserverIndex::PostApplySnapshot as usize;
assert_all!([&ob.called], &[index]);

Expand Down
5 changes: 3 additions & 2 deletions components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole};
pub mod config;
mod consistency_check;
pub mod dispatcher;
mod error;
pub mod error;
mod metrics;
pub mod region_info_accessor;
mod split_check;
Expand Down Expand Up @@ -198,7 +198,8 @@ pub trait ApplySnapshotObserver: Coprocessor {
_: u64,
_: &crate::store::SnapKey,
_snapshot: Option<&crate::store::Snapshot>,
) {
) -> Result<()> {
Ok(())
}

/// We call pre_apply_snapshot only when one of the observer returns true.
Expand Down
10 changes: 7 additions & 3 deletions components/raftstore/src/engine_store_ffi/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use yatp::{
};

use crate::{
coprocessor,
coprocessor::{
AdminObserver, ApplyCtxInfo, ApplySnapshotObserver, BoxAdminObserver,
BoxApplySnapshotObserver, BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver,
Expand Down Expand Up @@ -698,15 +699,17 @@ impl ApplySnapshotObserver for TiFlashObserver {
peer_id: u64,
snap_key: &crate::store::SnapKey,
snap: Option<&crate::store::Snapshot>,
) {
fail::fail_point!("on_ob_post_apply_snapshot", |_| {});
) -> std::result::Result<(), coprocessor::error::Error> {
fail::fail_point!("on_ob_post_apply_snapshot", |_| {
return Err(box_err!("on_ob_post_apply_snapshot"));
});
info!("post apply snapshot";
"peer_id" => ?peer_id,
"snap_key" => ?snap_key,
"region" => ?ob_ctx.region(),
);
let snap = match snap {
None => return,
None => return Ok(()),
Some(s) => s,
};
let maybe_snapshot = {
Expand Down Expand Up @@ -767,6 +770,7 @@ impl ApplySnapshotObserver for TiFlashObserver {
self.engine_store_server_helper
.apply_pre_handled_snapshot(ptr.0);
}
Ok(())
}

fn should_pre_apply_snapshot(&self) -> bool {
Expand Down
14 changes: 11 additions & 3 deletions components/raftstore/src/store/worker/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,15 @@ where
coprocessor_host: self.coprocessor_host.clone(),
};
s.apply(options)?;
self.coprocessor_host
.post_apply_snapshot(&region, peer_id, &snap_key, Some(&s));
match self
.coprocessor_host
.post_apply_snapshot(&region, peer_id, &snap_key, Some(&s))
{
Ok(_) => (),
Err(e) => {
return Err(box_err!("post apply snapshot error {:?}", e));
}
};

let mut wb = self.engine.write_batch();
region_state.set_state(PeerState::Normal);
Expand Down Expand Up @@ -1412,7 +1419,7 @@ mod tests {
peer_id: u64,
key: &crate::store::SnapKey,
snapshot: Option<&crate::store::Snapshot>,
) {
) -> std::result::Result<(), crate::coprocessor::error::Error> {
let code = snapshot.unwrap().total_size().unwrap()
+ key.term
+ key.region_id
Expand All @@ -1421,6 +1428,7 @@ mod tests {
self.post_apply_count.fetch_add(1, Ordering::SeqCst);
self.post_apply_hash
.fetch_add(code as usize, Ordering::SeqCst);
Ok(())
}

fn should_pre_apply_snapshot(&self) -> bool {
Expand Down
6 changes: 0 additions & 6 deletions new-mock-engine-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,11 @@ unsafe fn load_from_db(store: &mut EngineStoreServer, region_id: u64) {
"region_id" => region_id,
"cf" => cf,
"k" => ?k,
"v" => ?v,
);
Ok(true)
})
.unwrap();
}
debug!("after restore";
"store" => store_id,
"region_id" => region_id,
"default size" => region.data[2].len(),
);
}

unsafe fn write_to_db_data(
Expand Down
104 changes: 93 additions & 11 deletions tests/proxy/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,16 +1157,100 @@ mod ingest {
mod restart {
use super::*;
#[test]
fn test_restart() {
fn test_snap_restart() {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);

fail::cfg("on_can_apply_snapshot", "return(true)").unwrap();
disable_auto_gen_compact_log(&mut cluster);
cluster.cfg.raft_store.max_snapshot_file_raw_size = ReadableSize(u64::MAX);

// Disable default max peer count check.
pd_client.disable_default_operator();
let r1 = cluster.run_conf_change();

let first_value = vec![0; 10240];
for i in 0..10 {
let key = format!("{:03}", i);
cluster.must_put(key.as_bytes(), &first_value);
}
let first_key: &[u8] = b"000";

let eng_ids = cluster
.engines
.iter()
.map(|e| e.0.to_owned())
.collect::<Vec<_>>();

tikv_util::info!("engine_2 is {}", eng_ids[1]);
// engine 2 will not exec post apply snapshot.
fail::cfg("on_ob_pre_handle_snapshot", "return").unwrap();
fail::cfg("on_ob_post_apply_snapshot", "return").unwrap();

let engine_2 = cluster.get_engine(eng_ids[1]);
must_get_none(&engine_2, first_key);
// add peer (engine_2,engine_2) to region 1.
pd_client.must_add_peer(r1, new_peer(eng_ids[1], eng_ids[1]));

check_key(&cluster, first_key, &first_value, Some(false), None, None);

info!("stop node {}", eng_ids[1]);
cluster.stop_node(eng_ids[1]);
{
let lock = cluster.ffi_helper_set.lock();
lock.unwrap()
.deref_mut()
.get_mut(&eng_ids[1])
.unwrap()
.engine_store_server
.stop();
}

fail::remove("on_ob_pre_handle_snapshot");
fail::remove("on_ob_post_apply_snapshot");
info!("resume node {}", eng_ids[1]);
{
let lock = cluster.ffi_helper_set.lock();
lock.unwrap()
.deref_mut()
.get_mut(&eng_ids[1])
.unwrap()
.engine_store_server
.restore();
}
info!("restored node {}", eng_ids[1]);
cluster.run_node(eng_ids[1]).unwrap();

let (key, value) = (b"k2", b"v2");
cluster.must_put(key, value);
// we can get in memory, since snapshot is pre handled, though it is not persisted
check_key(
&cluster,
key,
value,
Some(true),
None,
Some(vec![eng_ids[1]]),
);
// now snapshot must be applied on peer engine_2
check_key(
&cluster,
first_key,
first_value.as_slice(),
Some(true),
None,
Some(vec![eng_ids[1]]),
);

cluster.shutdown();
}

#[test]
fn test_kv_restart() {
// Test if a empty command can be observed when leadership changes.
let (mut cluster, pd_client) = new_mock_cluster(0, 3);

// Disable AUTO generated compact log.
// This will not totally disable, so we use some failpoints later.
cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000);
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000);
cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(50000);
cluster.cfg.raft_store.raft_log_gc_threshold = 1000;
disable_auto_gen_compact_log(&mut cluster);

// We don't handle CompactLog at all.
fail::cfg("try_flush_data", "return(0)").unwrap();
Expand Down Expand Up @@ -1205,7 +1289,7 @@ mod restart {
k.as_bytes(),
v.as_bytes(),
Some(true),
None,
Some(true),
Some(vec![eng_ids[0]]),
);
}
Expand All @@ -1225,7 +1309,7 @@ mod restart {
k.as_bytes(),
v.as_bytes(),
Some(true),
None,
Some(false),
Some(vec![eng_ids[0]]),
);
}
Expand Down Expand Up @@ -1284,9 +1368,7 @@ mod snapshot {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);

fail::cfg("on_can_apply_snapshot", "return(true)").unwrap();
cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000);
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10);
cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(500);
disable_auto_gen_compact_log(&mut cluster);
cluster.cfg.raft_store.max_snapshot_file_raw_size = ReadableSize(u64::MAX);

// Disable default max peer count check.
Expand Down

0 comments on commit 12c105d

Please sign in to comment.