diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index 1b9bc9c733f..59192399f1f 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -599,25 +599,12 @@ impl RaftGrpcMessageObserver for TiFlashGrpcMessageObserver { } fn should_reject_snapshot(&self) -> Option { - info!( - "!!!!!!! should_reject_snapshot {}", - self.reject_messages_on_memory_ratio - ); - if self.reject_messages_on_memory_ratio < f64::EPSILON { return Some(false); } let mut usage = 0; - - let x = memory_usage_reaches_high_water(&mut usage); - - info!( - "!!!!!!! should_reject_snapshot memory_usage_reaches_high_water {}", - x - ); - - Some(x) + Some(memory_usage_reaches_high_water(&mut usage)) } } diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 272e69ad46f..f1aab2a95ab 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -81,10 +81,12 @@ pub struct DefaultGrpcMessageObserver {} impl RaftGrpcMessageObserver for DefaultGrpcMessageObserver { fn should_reject_append(&self) -> Option { + fail::fail_point!("force_reject_raft_append_message", |_| Some(true)); None } fn should_reject_snapshot(&self) -> Option { + fail::fail_point!("force_reject_raft_snapshot_message", |_| Some(true)); None } } @@ -205,6 +207,7 @@ impl Service { ch: &E::RaftExtension, msg: RaftMessage, reject: bool, + reject_snap: bool, ) -> RaftStoreResult<()> { let to_store_id = msg.get_to_peer().get_store_id(); if to_store_id != store_id { @@ -213,7 +216,9 @@ impl Service { my_store_id: store_id, }); } - if reject && msg.get_message().get_msg_type() == MessageType::MsgAppend { + if (reject && msg.get_message().get_msg_type() == MessageType::MsgAppend) + || (reject_snap && msg.get_message().get_msg_type() == MessageType::MsgSnapshot) + { RAFT_APPEND_REJECTS.inc(); let id = msg.get_region_id(); let peer_id = msg.get_message().get_from(); @@ -787,8 +792,13 @@ impl Tikv for Service { Some(b) => b, None => needs_reject_raft_append(reject_messages_on_memory_ratio), }; + let reject_snap = if let Some(b) = ob.should_reject_snapshot() { + b + } else { + false + }; if let Err(err @ RaftStoreError::StoreNotMatch { .. }) = - Self::handle_raft_message(store_id, &ch, msg, reject) + Self::handle_raft_message(store_id, &ch, msg, reject, reject_snap) { // Return an error here will break the connection, only do that for // `StoreNotMatch` to let tikv to resolve a correct address from PD @@ -846,9 +856,14 @@ impl Tikv for Service { Some(b) => b, None => needs_reject_raft_append(reject_messages_on_memory_ratio), }; + let reject_snap = if let Some(b) = ob.should_reject_snapshot() { + b + } else { + false + }; for msg in batch_msg.take_msgs().into_iter() { if let Err(err @ RaftStoreError::StoreNotMatch { .. }) = - Self::handle_raft_message(store_id, &ch, msg, reject) + Self::handle_raft_message(store_id, &ch, msg, reject, reject_snap) { // Return an error here will break the connection, only do that for // `StoreNotMatch` to let tikv to resolve a correct address from PD @@ -885,12 +900,9 @@ impl Tikv for Service { stream: RequestStream, sink: ClientStreamingSink, ) { - if let Some(rej) = self.raft_message_observer.should_reject_snapshot() { - if rej { - info!("!!!!!!! should_reject_snapshot 2"); - RAFT_SNAPSHOT_REJECTS.inc(); - return; - } + if let Some(true) = self.raft_message_observer.should_reject_snapshot() { + RAFT_SNAPSHOT_REJECTS.inc(); + return; }; let task = SnapTask::Recv { stream, sink }; if let Err(e) = self.snap_scheduler.schedule(task) { diff --git a/tests/failpoints/cases/test_server.rs b/tests/failpoints/cases/test_server.rs index dfbb883179c..7db7d2ad6d4 100644 --- a/tests/failpoints/cases/test_server.rs +++ b/tests/failpoints/cases/test_server.rs @@ -156,3 +156,49 @@ fn test_serving_status() { thread::sleep(Duration::from_millis(200)); assert_eq!(check(), ServingStatus::Serving); } + +#[test] +fn test_raft_message_observer() { + use raft::eraftpb::ConfChangeType; + + test_util::init_log_for_test(); + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + cluster.must_put(b"k1", b"v1"); + + fail::cfg("force_reject_raft_append_message", "return").unwrap(); + fail::cfg("force_reject_raft_snapshot_message", "return").unwrap(); + + cluster.pd_client.add_peer(r1, new_peer(2, 2)); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + must_get_none(&cluster.get_engine(2), b"k1"); + + fail::remove("force_reject_raft_append_message"); + fail::remove("force_reject_raft_snapshot_message"); + + cluster.pd_client.must_have_peer(r1, new_peer(2, 2)); + cluster.pd_client.must_add_peer(r1, new_peer(3, 3)); + + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + fail::cfg("force_reject_raft_append_message", "return").unwrap(); + + let _ = cluster.async_put(b"k3", b"v3").unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + must_get_none(&cluster.get_engine(2), b"k3"); + must_get_none(&cluster.get_engine(3), b"k3"); + + fail::remove("force_reject_raft_append_message"); + + cluster.must_put(b"k4", b"v4"); + for id in 1..=3 { + must_get_equal(&cluster.get_engine(id), b"k4", b"v4"); + } +}