Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Calvin Neo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Jan 10, 2025
1 parent 648c3e7 commit 0e7c349
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 23 deletions.
15 changes: 1 addition & 14 deletions proxy_components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,25 +599,12 @@ impl RaftGrpcMessageObserver for TiFlashGrpcMessageObserver {
}

fn should_reject_snapshot(&self) -> Option<bool> {
info!(
"!!!!!!! should_reject_snapshot {}",
self.reject_messages_on_memory_ratio
);

if self.reject_messages_on_memory_ratio < f64::EPSILON {
return Some(false);
}

let mut usage = 0;

let x = memory_usage_reaches_high_water(&mut usage);

info!(
"!!!!!!! should_reject_snapshot memory_usage_reaches_high_water {}",
x
);

Some(x)
Some(memory_usage_reaches_high_water(&mut usage))
}
}

Expand Down
30 changes: 21 additions & 9 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ pub struct DefaultGrpcMessageObserver {}

impl RaftGrpcMessageObserver for DefaultGrpcMessageObserver {
fn should_reject_append(&self) -> Option<bool> {
fail::fail_point!("force_reject_raft_append_message", |_| Some(true));
None
}

fn should_reject_snapshot(&self) -> Option<bool> {
fail::fail_point!("force_reject_raft_snapshot_message", |_| Some(true));
None
}
}
Expand Down Expand Up @@ -205,6 +207,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Service<E, L, F> {
ch: &E::RaftExtension,
msg: RaftMessage,
reject: bool,
reject_snap: bool,
) -> RaftStoreResult<()> {
let to_store_id = msg.get_to_peer().get_store_id();
if to_store_id != store_id {
Expand All @@ -213,7 +216,9 @@ impl<E: Engine, L: LockManager, F: KvFormat> Service<E, L, F> {
my_store_id: store_id,
});
}
if reject && msg.get_message().get_msg_type() == MessageType::MsgAppend {
if (reject && msg.get_message().get_msg_type() == MessageType::MsgAppend)
|| (reject_snap && msg.get_message().get_msg_type() == MessageType::MsgSnapshot)
{
RAFT_APPEND_REJECTS.inc();
let id = msg.get_region_id();
let peer_id = msg.get_message().get_from();
Expand Down Expand Up @@ -787,8 +792,13 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
Some(b) => b,
None => needs_reject_raft_append(reject_messages_on_memory_ratio),
};
let reject_snap = if let Some(b) = ob.should_reject_snapshot() {
b
} else {
false
};
if let Err(err @ RaftStoreError::StoreNotMatch { .. }) =
Self::handle_raft_message(store_id, &ch, msg, reject)
Self::handle_raft_message(store_id, &ch, msg, reject, reject_snap)
{
// Return an error here will break the connection, only do that for
// `StoreNotMatch` to let tikv to resolve a correct address from PD
Expand Down Expand Up @@ -846,9 +856,14 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
Some(b) => b,
None => needs_reject_raft_append(reject_messages_on_memory_ratio),
};
let reject_snap = if let Some(b) = ob.should_reject_snapshot() {
b
} else {
false
};
for msg in batch_msg.take_msgs().into_iter() {
if let Err(err @ RaftStoreError::StoreNotMatch { .. }) =
Self::handle_raft_message(store_id, &ch, msg, reject)
Self::handle_raft_message(store_id, &ch, msg, reject, reject_snap)
{
// Return an error here will break the connection, only do that for
// `StoreNotMatch` to let tikv to resolve a correct address from PD
Expand Down Expand Up @@ -885,12 +900,9 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
stream: RequestStream<SnapshotChunk>,
sink: ClientStreamingSink<Done>,
) {
if let Some(rej) = self.raft_message_observer.should_reject_snapshot() {
if rej {
info!("!!!!!!! should_reject_snapshot 2");
RAFT_SNAPSHOT_REJECTS.inc();
return;
}
if let Some(true) = self.raft_message_observer.should_reject_snapshot() {
RAFT_SNAPSHOT_REJECTS.inc();
return;
};
let task = SnapTask::Recv { stream, sink };
if let Err(e) = self.snap_scheduler.schedule(task) {
Expand Down
46 changes: 46 additions & 0 deletions tests/failpoints/cases/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,49 @@ fn test_serving_status() {
thread::sleep(Duration::from_millis(200));
assert_eq!(check(), ServingStatus::Serving);
}

#[test]
fn test_raft_message_observer() {
use raft::eraftpb::ConfChangeType;

test_util::init_log_for_test();
let mut cluster = new_server_cluster(0, 3);
cluster.pd_client.disable_default_operator();
let r1 = cluster.run_conf_change();

cluster.must_put(b"k1", b"v1");

fail::cfg("force_reject_raft_append_message", "return").unwrap();
fail::cfg("force_reject_raft_snapshot_message", "return").unwrap();

cluster.pd_client.add_peer(r1, new_peer(2, 2));

std::thread::sleep(std::time::Duration::from_millis(500));

must_get_none(&cluster.get_engine(2), b"k1");

fail::remove("force_reject_raft_append_message");
fail::remove("force_reject_raft_snapshot_message");

cluster.pd_client.must_have_peer(r1, new_peer(2, 2));
cluster.pd_client.must_add_peer(r1, new_peer(3, 3));

must_get_equal(&cluster.get_engine(2), b"k1", b"v1");
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

fail::cfg("force_reject_raft_append_message", "return").unwrap();

let _ = cluster.async_put(b"k3", b"v3").unwrap();

std::thread::sleep(std::time::Duration::from_millis(500));

must_get_none(&cluster.get_engine(2), b"k3");
must_get_none(&cluster.get_engine(3), b"k3");

fail::remove("force_reject_raft_append_message");

cluster.must_put(b"k4", b"v4");
for id in 1..=3 {
must_get_equal(&cluster.get_engine(id), b"k4", b"v4");
}
}

0 comments on commit 0e7c349

Please sign in to comment.