Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM A demo shows removing check_snapshot will cause region overlap #391

Open
wants to merge 1 commit into
base: raftstore-proxy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ where
for m in msgs.drain(..) {
distribution[m.discriminant()] += 1;
match m {
PeerMsg::RaftMessage(msg, sent_time) => {
PeerMsg::RaftMessage(mut msg, sent_time) => {
if let Some(sent_time) = sent_time {
let wait_time = sent_time.saturating_elapsed().as_secs_f64();
self.ctx.raft_metrics.process_wait_time.observe(wait_time);
Expand All @@ -648,6 +648,36 @@ where
continue;
}

info!("!!!!!! handle_msgs 11111");
let modify_msg: bool = (|| {
fail::fail_point!("mock_overlapped_region_1", |t| {
let t = t.unwrap().parse::<u64>().unwrap();
t
});
0
})() != 0;

info!("!!!!!! handle_msgs 22222");
if modify_msg {
if msg.msg.get_message().get_msg_type() == raft::eraftpb::MessageType::MsgSnapshot &&
msg.msg.get_to_peer().get_store_id() == 2 &&
msg.msg.region_id == 1
{
info!("!!!!! origin {:?}", msg.msg);
let mut msg2 = msg.msg.clone();
let mut snapshot = msg.msg.get_message().get_snapshot();
let snapshot_data = snapshot.get_data();
let mut parsed_data = kvproto::raft_serverpb::RaftSnapshotData::default();
parsed_data.merge_from_bytes(snapshot_data);
parsed_data.mut_region().set_start_key(b"".to_vec());
parsed_data.mut_region().set_end_key(b"".to_vec());
msg2.mut_message().mut_snapshot().set_data(parsed_data.write_to_bytes().unwrap().into());
msg.msg = msg2;
info!("!!!!! after {:?}", msg.msg);
}
}

info!("!!!!!! handle_msgs 33333");
if let Err(e) = self.on_raft_message(msg) {
error!(%e;
"handle raft message err";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Transport for ChannelTransport {

type SimulateChannelTransport<EK> = SimulateTransport<ChannelTransport, EK>;
pub struct NodeCluster {
trans: ChannelTransport,
pub trans: ChannelTransport,
pd_client: Arc<TestPdClient>,
nodes: HashMap<u64, MultiRaftServer<TestPdClient, TiFlashEngine, ProxyRaftEngine>>,
snap_mgrs: HashMap<u64, SnapManager>,
Expand Down
2 changes: 1 addition & 1 deletion proxy_tests/proxy/shared/fast_add_peer/fp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ fn test_msgsnapshot_before_msgappend() {
debug!("compact at index {}", compact_index);
let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term);
let req = test_raftstore::new_admin_request(1, region.get_region_epoch(), compact_log);
let res = cluster
let _ = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();

Expand Down
56 changes: 56 additions & 0 deletions proxy_tests/proxy/shared/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use fail::fail_point;

// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
use crate::utils::v1::*;

Expand Down Expand Up @@ -493,3 +495,57 @@ fn test_apply_cancelled_pre_handle() {
fail::remove("on_ob_cancel_after_pre_handle_snapshot");
cluster.shutdown();
}

#[test]
fn test_apply_before_snapshot() {
fail::cfg("on_pre_write_apply_state", "return").unwrap();

tikv_util::set_panic_hook(true, "./");
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2);
assert_eq!(cluster.cfg.proxy_cfg.raft_store.snap_handle_pool_size, 2);

disable_auto_gen_compact_log(&mut cluster);

// Disable default max peer count check.
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
let eng_ids = cluster
.engines
.iter()
.map(|e| e.0.to_owned())
.collect::<Vec<_>>();

cluster.must_put(b"k1", b"v");
cluster.must_put(b"k2", b"v");
cluster.must_put(b"k3", b"v");
cluster.must_put(b"k4", b"v");

check_key(&cluster, b"k4", b"v", Some(true), None, Some(vec![1]));

cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(r1, 2)
.msg_type(MessageType::MsgAppend)
.direction(Direction::Both),
));

pd_client.add_peer(cluster.get_region(b"k1").get_id(), new_learner_peer(eng_ids[1], 1112));

std::thread::sleep(std::time::Duration::from_millis(2000));

cluster.must_split(&cluster.get_region(b"k1"), b"k2");
// k1 in 1000, k4 in 1
info!("k1 in {}, k4 in {}", cluster.get_region(b"k1").get_id(), cluster.get_region(b"k4").get_id());

std::thread::sleep(std::time::Duration::from_millis(2000));

check_key(&cluster, b"k1", b"v", Some(true), None, Some(vec![1, 2]));

fail::cfg("mock_overlapped_region_1", "return(1)");
cluster.clear_send_filters();
std::thread::sleep(std::time::Duration::from_millis(2000));

check_key(&cluster, b"k4", b"v", Some(true), None, Some(vec![1, 2]));

cluster.shutdown();
}
Loading