diff --git a/Cargo.lock b/Cargo.lock index 2da134c6689..fa2011852f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4027,7 +4027,7 @@ dependencies = [ [[package]] name = "librocksdb_sys" version = "0.1.0" -source = "git+https://github.com/tikv/rust-rocksdb.git#2bb1e4e32b9e45cf3fd8210766a9db38eacd5e4d" +source = "git+https://github.com/tikv/rust-rocksdb.git#19eeae6dc7734af12475fbcf9d368168a0314085" dependencies = [ "bindgen 0.65.1", "bzip2-sys", @@ -4046,7 +4046,7 @@ dependencies = [ [[package]] name = "libtitan_sys" version = "0.0.1" -source = "git+https://github.com/tikv/rust-rocksdb.git#2bb1e4e32b9e45cf3fd8210766a9db38eacd5e4d" +source = "git+https://github.com/tikv/rust-rocksdb.git#19eeae6dc7734af12475fbcf9d368168a0314085" dependencies = [ "bzip2-sys", "cc", @@ -6367,7 +6367,7 @@ dependencies = [ [[package]] name = "rocksdb" version = "0.3.0" -source = "git+https://github.com/tikv/rust-rocksdb.git#2bb1e4e32b9e45cf3fd8210766a9db38eacd5e4d" +source = "git+https://github.com/tikv/rust-rocksdb.git#19eeae6dc7734af12475fbcf9d368168a0314085" dependencies = [ "libc 0.2.151", "librocksdb_sys", diff --git a/Cargo.toml b/Cargo.toml index 7b77c9dfc50..82e544ae030 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ testexport = [ "engine_rocks/testexport", "engine_panic/testexport", "encryption/testexport", - "file_system/testexport" + "file_system/testexport", ] test-engine-kv-rocksdb = ["engine_test/test-engine-kv-rocksdb"] test-engine-raft-raft-engine = ["engine_test/test-engine-raft-raft-engine"] diff --git a/components/batch-system/src/batch.rs b/components/batch-system/src/batch.rs index 8cc873bd4b6..0b7f9f45489 100644 --- a/components/batch-system/src/batch.rs +++ b/components/batch-system/src/batch.rs @@ -583,7 +583,6 @@ where let t = thread::Builder::new() .name(name) .spawn_wrapper(move || { - tikv_alloc::thread_allocate_exclusive_arena().unwrap(); tikv_util::thread_group::set_properties(props); set_io_type(IoType::ForegroundWrite); poller.poll(); diff --git a/components/engine_rocks/src/import.rs b/components/engine_rocks/src/import.rs index 96e3acd1c40..930772d8c09 100644 --- a/components/engine_rocks/src/import.rs +++ b/components/engine_rocks/src/import.rs @@ -1,8 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -#![allow(unused_variables)] -#![allow(unreachable_code)] - use engine_traits::{ImportExt, IngestExternalFileOptions, Result}; use rocksdb::IngestExternalFileOptions as RawIngestExternalFileOptions; use tikv_util::time::Instant; diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 54cef0ab58f..1f09e67edad 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -3820,6 +3820,10 @@ where None => { self.propose_pending_batch_raft_command(); if self.propose_locks_before_transfer_leader(msg) { + fail_point!( + "finish_proposing_transfer_cmd_after_proposing_locks", + |_| {} + ); // If some pessimistic locks are just proposed, we propose another // TransferLeader command instead of transferring leader immediately. info!("propose transfer leader command"; diff --git a/components/resource_control/src/metrics.rs b/components/resource_control/src/metrics.rs index 594c6af486a..69ae0542a26 100644 --- a/components/resource_control/src/metrics.rs +++ b/components/resource_control/src/metrics.rs @@ -38,7 +38,7 @@ lazy_static! { "tikv_resource_control_priority_wait_duration", "Histogram of wait duration cause by priority quota limiter", &["priority"], - exponential_buckets(1e-5, 2.0, 18).unwrap() // 10us ~ 2.5s + exponential_buckets(1e-5, 2.0, 22).unwrap() // 10us ~ 42s ) .unwrap(); diff --git a/components/server/src/server.rs b/components/server/src/server.rs index b8ba53641ab..a0ed481b92a 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -102,7 +102,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve, - service::{DebugService, DiagnosticsService}, + service::{DebugService, DefaultGrpcMessageFilter, DiagnosticsService}, status_server::StatusServer, tablet_snap::NoSnapshotCache, ttl::TtlChecker, @@ -122,7 +122,9 @@ use tikv::{ Engine, Storage, }, }; -use tikv_alloc::{add_thread_memory_accessor, remove_thread_memory_accessor}; +use tikv_alloc::{ + add_thread_memory_accessor, remove_thread_memory_accessor, thread_allocate_exclusive_arena, +}; use tikv_util::{ check_environment_variables, config::VersionTrack, @@ -330,7 +332,7 @@ where // SAFETY: we will call `remove_thread_memory_accessor` at before_stop. unsafe { add_thread_memory_accessor() }; - tikv_alloc::thread_allocate_exclusive_arena().unwrap(); + thread_allocate_exclusive_arena().unwrap(); }) .before_stop(|| { remove_thread_memory_accessor(); @@ -889,6 +891,9 @@ where debug_thread_pool, health_controller, self.resource_manager.clone(), + Arc::new(DefaultGrpcMessageFilter::new( + server_config.value().reject_messages_on_memory_ratio, + )), ) .unwrap_or_else(|e| fatal!("failed to create server: {}", e)); cfg_controller.register( diff --git a/components/server/src/server2.rs b/components/server/src/server2.rs index 5381a365de8..4c86db45845 100644 --- a/components/server/src/server2.rs +++ b/components/server/src/server2.rs @@ -91,7 +91,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve, - service::{DebugService, DiagnosticsService}, + service::{DebugService, DefaultGrpcMessageFilter, DiagnosticsService}, status_server::StatusServer, KvEngineFactoryBuilder, NodeV2, RaftKv2, Server, CPU_CORES_QUOTA_GAUGE, GRPC_THREAD_PREFIX, MEMORY_LIMIT_GAUGE, @@ -109,7 +109,9 @@ use tikv::{ Engine, Storage, }, }; -use tikv_alloc::{add_thread_memory_accessor, remove_thread_memory_accessor}; +use tikv_alloc::{ + add_thread_memory_accessor, remove_thread_memory_accessor, thread_allocate_exclusive_arena, +}; use tikv_util::{ check_environment_variables, config::VersionTrack, @@ -301,6 +303,7 @@ where // SAFETY: we will call `remove_thread_memory_accessor` at before_stop. unsafe { add_thread_memory_accessor() }; + thread_allocate_exclusive_arena().unwrap(); }) .before_stop(|| { remove_thread_memory_accessor(); @@ -826,6 +829,9 @@ where debug_thread_pool, health_controller, self.resource_manager.clone(), + Arc::new(DefaultGrpcMessageFilter::new( + server_config.value().reject_messages_on_memory_ratio, + )), ) .unwrap_or_else(|e| fatal!("failed to create server: {}", e)); cfg_controller.register( diff --git a/components/test_raftstore-v2/src/server.rs b/components/test_raftstore-v2/src/server.rs index 47830c77730..4c2906a9be9 100644 --- a/components/test_raftstore-v2/src/server.rs +++ b/components/test_raftstore-v2/src/server.rs @@ -61,7 +61,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve, - service::{DebugService, DiagnosticsService}, + service::{DebugService, DefaultGrpcMessageFilter, DiagnosticsService}, ConnectionBuilder, Error, Extension, NodeV2, PdStoreAddrResolver, RaftClient, RaftKv2, Result as ServerResult, Server, ServerTransport, }, @@ -644,6 +644,9 @@ impl ServerCluster { debug_thread_pool.clone(), health_controller.clone(), resource_manager.clone(), + Arc::new(DefaultGrpcMessageFilter::new( + server_cfg.value().reject_messages_on_memory_ratio, + )), ) .unwrap(); svr.register_service(create_diagnostics(diag_service.clone())); diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index 0a165ec0528..374533b427c 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -1465,6 +1465,17 @@ impl Cluster { .unwrap() } + pub fn try_transfer_leader_with_timeout( + &mut self, + region_id: u64, + leader: metapb::Peer, + timeout: Duration, + ) -> Result { + let epoch = self.get_region_epoch(region_id); + let transfer_leader = new_admin_request(region_id, &epoch, new_transfer_leader_cmd(leader)); + self.call_command_on_leader(transfer_leader, timeout) + } + pub fn get_snap_dir(&self, node_id: u64) -> String { self.sim.rl().get_snap_dir(node_id) } diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index a50e226f640..a023f1541a2 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -67,7 +67,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve::{self, StoreAddrResolver}, - service::DebugService, + service::{DebugService, DefaultGrpcMessageFilter}, tablet_snap::NoSnapshotCache, ConnectionBuilder, Error, MultiRaftServer, PdStoreAddrResolver, RaftClient, RaftKv, Result as ServerResult, Server, ServerTransport, @@ -617,6 +617,9 @@ impl ServerCluster { debug_thread_pool.clone(), health_controller.clone(), resource_manager.clone(), + Arc::new(DefaultGrpcMessageFilter::new( + server_cfg.value().reject_messages_on_memory_ratio, + )), ) .unwrap(); svr.register_service(create_import_sst(import_service.clone())); diff --git a/components/test_raftstore/src/util.rs b/components/test_raftstore/src/util.rs index f6f513b767b..1cd4a17d730 100644 --- a/components/test_raftstore/src/util.rs +++ b/components/test_raftstore/src/util.rs @@ -1305,8 +1305,13 @@ pub fn kv_pessimistic_lock_with_ttl( } pub fn must_kv_pessimistic_lock(client: &TikvClient, ctx: Context, key: Vec, ts: u64) { - let resp = kv_pessimistic_lock(client, ctx, vec![key], ts, ts, false); - assert!(!resp.has_region_error(), "{:?}", resp.get_region_error()); + let resp = kv_pessimistic_lock(client, ctx.clone(), vec![key], ts, ts, false); + assert!( + !resp.has_region_error(), + "{:?}, ctx:{:?}", + resp.get_region_error(), + ctx + ); assert!(resp.errors.is_empty(), "{:?}", resp.get_errors()); } diff --git a/components/tidb_query_datatype/src/codec/mysql/time/mod.rs b/components/tidb_query_datatype/src/codec/mysql/time/mod.rs index 97f756bc1fe..a537a6e7abe 100644 --- a/components/tidb_query_datatype/src/codec/mysql/time/mod.rs +++ b/components/tidb_query_datatype/src/codec/mysql/time/mod.rs @@ -1484,6 +1484,19 @@ impl Time { Time::try_from_chrono_datetime(ctx, timestamp.naive_local(), time_type, fsp as i8) } + pub fn from_unixtime( + ctx: &mut EvalContext, + seconds: i64, + nanos: u32, + time_type: TimeType, + fsp: i8, + ) -> Result { + let timestamp = Utc.timestamp(seconds, nanos); + let timestamp = ctx.cfg.tz.from_utc_datetime(×tamp.naive_utc()); + let timestamp = timestamp.round_subsecs(fsp as u16); + Time::try_from_chrono_datetime(ctx, timestamp.naive_local(), time_type, fsp) + } + pub fn from_year( ctx: &mut EvalContext, year: u32, diff --git a/components/tidb_query_expr/src/impl_time.rs b/components/tidb_query_expr/src/impl_time.rs index 078cd2a85dd..8f226bc7900 100644 --- a/components/tidb_query_expr/src/impl_time.rs +++ b/components/tidb_query_expr/src/impl_time.rs @@ -1518,6 +1518,88 @@ pub fn sub_date_time_duration_interval_any_as_duration< ) } +#[rpn_fn(capture = [ctx, extra])] +#[inline] +pub fn from_unixtime_1_arg( + ctx: &mut EvalContext, + extra: &RpnFnCallExtra, + arg0: &Decimal, +) -> Result> { + eval_from_unixtime(ctx, extra.ret_field_type.get_decimal() as i8, *arg0) +} + +#[rpn_fn(capture = [ctx, extra])] +#[inline] +pub fn from_unixtime_2_arg( + ctx: &mut EvalContext, + extra: &RpnFnCallExtra, + arg0: &Decimal, + arg1: BytesRef, +) -> Result> { + let t = eval_from_unixtime(ctx, extra.ret_field_type.get_decimal() as i8, *arg0)?; + match t { + Some(t) => { + let res = t.date_format(std::str::from_utf8(arg1).map_err(Error::Encoding)?)?; + Ok(Some(res.into())) + } + None => Ok(None), + } +} + +// Port from TiDB's evalFromUnixTime +pub fn eval_from_unixtime( + ctx: &mut EvalContext, + mut fsp: i8, + unix_timestamp: Decimal, +) -> Result> { + // 0 <= unixTimeStamp <= 32536771199.999999 + if unix_timestamp.is_negative() { + return Ok(None); + } + let integral_part = unix_timestamp.as_i64().unwrap(); // Ignore Truncated error and Overflow error + // The max integralPart should not be larger than 32536771199. + // Refer to https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-28.html + if integral_part > 32536771199 { + return Ok(None); + } + // Split the integral part and fractional part of a decimal timestamp. + // e.g. for timestamp 12345.678, + // first get the integral part 12345, + // then (12345.678 - 12345) * (10^9) to get the decimal part and convert it to + // nanosecond precision. + let integer_decimal_tp = Decimal::from(integral_part); + let frac_decimal_tp = &unix_timestamp - &integer_decimal_tp; + if !frac_decimal_tp.is_ok() { + return Ok(None); + } + let frac_decimal_tp = frac_decimal_tp.unwrap(); + let nano = Decimal::from(NANOS_PER_SEC); + let x = &frac_decimal_tp * &nano; + if x.is_overflow() { + return Err(Error::overflow("DECIMAL", "").into()); + } + if x.is_truncated() { + return Err(Error::truncated().into()); + } + let x = x.unwrap(); + let fractional_part = x.as_i64(); // here fractionalPart is result multiplying the original fractional part by 10^9. + if fractional_part.is_overflow() { + return Err(Error::overflow("DECIMAL", "").into()); + } + let fractional_part = fractional_part.unwrap(); + if fsp < 0 { + fsp = MAX_FSP; + } + let tmp = DateTime::from_unixtime( + ctx, + integral_part, + fractional_part as u32, + TimeType::DateTime, + fsp, + )?; + Ok(Some(tmp)) +} + #[cfg(test)] mod tests { use std::{str::FromStr, sync::Arc}; @@ -3843,4 +3925,80 @@ mod tests { } } } + + #[test] + fn test_from_unixtime_1_arg() { + let cases = vec![ + (1451606400.0, 0, Some("2016-01-01 00:00:00")), + (1451606400.123456, 6, Some("2016-01-01 00:00:00.123456")), + (1451606400.999999, 6, Some("2016-01-01 00:00:00.999999")), + (1451606400.9999999, 6, Some("2016-01-01 00:00:01.000000")), + (1451606400.9999995, 6, Some("2016-01-01 00:00:01.000000")), + (1451606400.9999994, 6, Some("2016-01-01 00:00:00.999999")), + (1451606400.123, 3, Some("2016-01-01 00:00:00.123")), + (5000000000.0, 0, Some("2128-06-11 08:53:20")), + (32536771199.99999, 6, Some("3001-01-18 23:59:59.999990")), + (0.0, 6, Some("1970-01-01 00:00:00.000000")), + (-1.0, 6, None), + (32536771200.0, 6, None), + ]; + let mut ctx = EvalContext::default(); + for (datetime, fsp, expected) in cases { + let decimal = Decimal::from_f64(datetime).unwrap(); + let mut result_field_type: FieldType = FieldTypeTp::DateTime.into(); + result_field_type.set_decimal(fsp as i32); + + let (result, _) = RpnFnScalarEvaluator::new() + .push_param(decimal) + .evaluate_raw(result_field_type, ScalarFuncSig::FromUnixTime1Arg); + let output: Option = result.unwrap().into(); + + let expected = + expected.map(|arg1| DateTime::parse_datetime(&mut ctx, arg1, fsp, false).unwrap()); + assert_eq!(output, expected); + } + } + + #[test] + fn test_from_unixtime_2_arg() { + let cases = vec![ + ( + 1451606400.0, + "%Y %D %M %h:%i:%s %x", + 0, + Some("2016 1st January 12:00:00 2015"), + ), + ( + 1451606400.123456, + "%Y %D %M %h:%i:%s %x", + 6, + Some("2016 1st January 12:00:00 2015"), + ), + ( + 1451606400.999999, + "%Y %D %M %h:%i:%s %x", + 6, + Some("2016 1st January 12:00:00 2015"), + ), + ( + 1451606400.9999999, + "%Y %D %M %h:%i:%s %x", + 6, + Some("2016 1st January 12:00:01 2015"), + ), + ]; + for (datetime, format, fsp, expected) in cases { + let decimal = Decimal::from_f64(datetime).unwrap(); + let mut result_field_type: FieldType = FieldTypeTp::String.into(); + result_field_type.set_decimal(fsp); + let (result, _) = RpnFnScalarEvaluator::new() + .push_param(decimal) + .push_param(format) + .evaluate_raw(result_field_type, ScalarFuncSig::FromUnixTime2Arg); + let output: Option = result.unwrap().into(); + + let expected = expected.map(|str| str.as_bytes().to_vec()); + assert_eq!(output, expected); + } + } } diff --git a/components/tidb_query_expr/src/lib.rs b/components/tidb_query_expr/src/lib.rs index 55cb4601f54..b47b0bdf63b 100644 --- a/components/tidb_query_expr/src/lib.rs +++ b/components/tidb_query_expr/src/lib.rs @@ -931,6 +931,8 @@ fn map_expr_node_to_rpn_func(expr: &Expr) -> Result { ScalarFuncSig::SubDateDurationRealDatetime => sub_date_time_duration_interval_any_as_datetime_fn_meta::(), ScalarFuncSig::AddDateDurationDecimalDatetime => add_date_time_duration_interval_any_as_datetime_fn_meta::(), ScalarFuncSig::SubDateDurationDecimalDatetime => sub_date_time_duration_interval_any_as_datetime_fn_meta::(), + ScalarFuncSig::FromUnixTime1Arg => from_unixtime_1_arg_fn_meta(), + ScalarFuncSig::FromUnixTime2Arg => from_unixtime_2_arg_fn_meta(), _ => return Err(other_err!( "ScalarFunction {:?} is not supported in batch mode", value diff --git a/components/tikv_util/src/sys/thread.rs b/components/tikv_util/src/sys/thread.rs index 818d8795b31..9d119cc09c8 100644 --- a/components/tikv_util/src/sys/thread.rs +++ b/components/tikv_util/src/sys/thread.rs @@ -7,7 +7,9 @@ use std::{io, io::Result, sync::Mutex, thread}; use collections::HashMap; -use tikv_alloc::{add_thread_memory_accessor, remove_thread_memory_accessor}; +use tikv_alloc::{ + add_thread_memory_accessor, remove_thread_memory_accessor, thread_allocate_exclusive_arena, +}; /// A cross-platform CPU statistics data structure. #[derive(Debug, Copy, Clone, Default, PartialEq)] @@ -430,6 +432,7 @@ impl StdThreadBuildWrapper for std::thread::Builder { call_thread_start_hooks(); // SAFETY: we will call `remove_thread_memory_accessor` at defer. unsafe { add_thread_memory_accessor() }; + thread_allocate_exclusive_arena().unwrap(); add_thread_name_to_map(); defer! {{ remove_thread_name_from_map(); @@ -452,9 +455,8 @@ impl ThreadBuildWrapper for tokio::runtime::Builder { // SAFETY: we will call `remove_thread_memory_accessor` at // `before-stop_wrapper`. // FIXME: What if the user only calls `after_start_wrapper`? - unsafe { - add_thread_memory_accessor(); - } + unsafe { add_thread_memory_accessor() }; + thread_allocate_exclusive_arena().unwrap(); add_thread_name_to_map(); start(); }) @@ -478,9 +480,8 @@ impl ThreadBuildWrapper for futures::executor::ThreadPoolBuilder { // SAFETY: we will call `remove_thread_memory_accessor` at // `before-stop_wrapper`. // FIXME: What if the user only calls `after_start_wrapper`? - unsafe { - add_thread_memory_accessor(); - } + unsafe { add_thread_memory_accessor() }; + thread_allocate_exclusive_arena().unwrap(); add_thread_name_to_map(); start(); }) diff --git a/components/tikv_util/src/yatp_pool/metrics.rs b/components/tikv_util/src/yatp_pool/metrics.rs index a3e68b260db..53ab05e86b5 100644 --- a/components/tikv_util/src/yatp_pool/metrics.rs +++ b/components/tikv_util/src/yatp_pool/metrics.rs @@ -20,7 +20,7 @@ lazy_static! { "tikv_yatp_pool_schedule_wait_duration", "Histogram of yatp pool schedule wait duration.", &["name", "priority"], - exponential_buckets(1e-5, 2.0, 18).unwrap() // 10us ~ 2.5s + exponential_buckets(1e-5, 2.0, 22).unwrap() // 10us ~ 42s ) .unwrap(); } diff --git a/components/tracker/src/lib.rs b/components/tracker/src/lib.rs index d03cd9502b2..5fbc9216048 100644 --- a/components/tracker/src/lib.rs +++ b/components/tracker/src/lib.rs @@ -107,7 +107,7 @@ impl RequestInfo { resource_group_tag: ctx.get_resource_group_tag().to_vec(), request_type, cid: 0, - is_external_req: ctx.get_request_source().starts_with("external"), + is_external_req: ctx.get_request_source().contains("external"), } } } diff --git a/components/txn_types/src/lock.rs b/components/txn_types/src/lock.rs index 26dc493262e..98650f1a8e4 100644 --- a/components/txn_types/src/lock.rs +++ b/components/txn_types/src/lock.rs @@ -84,6 +84,10 @@ pub struct Lock { pub txn_size: u64, pub min_commit_ts: TimeStamp, pub use_async_commit: bool, + // This field is only valid for in-memory locks and does not need to be persisted because: + // 1. the lock should be converted to a write directly when 1pc succeeds. + // 2. the field should be reverted to false (default value) when 1pc fails. + pub use_one_pc: bool, // Only valid when `use_async_commit` is true, and the lock is primary. Do not set // `secondaries` for secondaries. pub secondaries: Vec>, @@ -171,6 +175,7 @@ impl Lock { txn_size, min_commit_ts, use_async_commit: false, + use_one_pc: false, secondaries: Vec::default(), rollback_ts: Vec::default(), last_change: LastChange::default(), @@ -489,7 +494,11 @@ impl Lock { ))); } - if ts == TimeStamp::max() && raw_key == lock.primary && !lock.use_async_commit { + if ts == TimeStamp::max() + && raw_key == lock.primary + && !lock.use_async_commit + && !lock.use_one_pc + { // When `ts == TimeStamp::max()` (which means to get latest committed version // for primary key), and current key is the primary key, we ignore // this lock. @@ -1326,6 +1335,7 @@ mod tests { txn_size: 0, min_commit_ts: 20.into(), use_async_commit: false, + use_one_pc: false, secondaries: vec![], rollback_ts: vec![], last_change: LastChange::make_exist(8.into(), 2), diff --git a/etc/config-template.toml b/etc/config-template.toml index 78aa3c31b77..f6c8427af75 100644 --- a/etc/config-template.toml +++ b/etc/config-template.toml @@ -237,9 +237,13 @@ ## Max time to handle Coprocessor requests before timeout. # end-point-request-max-handle-duration = "60s" +## Memory usage limit for TiKV handling coprocessor requests. +## By default, it will be set to 12.5% of the available memory of TiKV. +# end-point-memory-quota = "0B" + ## Max bytes that snapshot can interact with disk in one second. It should be ## set based on your disk performance. Only write flow is considered, if -## partiioned-raft-kv is used, read flow is also considered and it will be estimated +## partitioned-raft-kv is used, read flow is also considered and it will be estimated ## as read_size * 0.5 to get around errors from page cache. # snap-io-max-bytes-per-sec = "100MB" @@ -971,7 +975,7 @@ ## default: 0 # zstd-dict-size = 0 -## Whether to share blob cache with block cache. If set to true, Titan would use the shared block +## Whether to share blob cache with block cache. If set to true, Titan would use the shared block ## cache configured in `storage.block_cache` and ignore the setting of `blob-cache-size`. ## default: true # shared-blob-cache = true @@ -1220,13 +1224,13 @@ # ## Avoid outputing data (e.g. user keys) to info log. It currently does not avoid printing ## user data altogether, but greatly reduce those logs. -## +## ## Candidates: ## true | "on": Avoid outputing raw data to info log, raw data will be replaced with "?". ## false | "off": Output raw data to info log. ## "marker": Encapsulate the raw data with "‹..›" to info log. ## -## Default is false. +## Default is false. # redact-info-log = false ## Configurations for encryption at rest. Experimental. diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index 66787f5278a..45e5f20d314 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -46,6 +46,7 @@ use health_controller::HealthController; use in_memory_engine::InMemoryEngineStatistics; use kvproto::{ debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst, + raft_serverpb::RaftMessage, }; use pd_client::{PdClient, RpcClient}; use raft_log_engine::RaftLogEngine; @@ -84,7 +85,7 @@ use tikv::{ gc_worker::GcWorker, raftkv::ReplicaReadLockChecker, resolve, - service::{DebugService, DiagnosticsService}, + service::{DebugService, DiagnosticsService, RaftGrpcMessageFilter}, tablet_snap::NoSnapshotCache, ttl::TtlChecker, KvEngineFactoryBuilder, MultiRaftServer, RaftKv, Server, CPU_CORES_QUOTA_GAUGE, @@ -106,7 +107,10 @@ use tikv_util::{ config::{ensure_dir_exist, ReadableDuration, VersionTrack}, error, quota_limiter::{QuotaLimitConfigManager, QuotaLimiter}, - sys::{disk, register_memory_usage_high_water, thread::ThreadBuildWrapper, SysQuota}, + sys::{ + disk, memory_usage_reaches_high_water, register_memory_usage_high_water, + thread::ThreadBuildWrapper, SysQuota, + }, thread_group::GroupProperties, time::{Instant, Monitor}, worker::{Builder as WorkerBuilder, LazyWorker, Scheduler}, @@ -121,6 +125,39 @@ use crate::{ util::ffi_server_info, }; +#[derive(Clone)] +pub struct TiFlashGrpcMessageFilter { + reject_messages_on_memory_ratio: f64, +} + +impl TiFlashGrpcMessageFilter { + pub fn new(reject_messages_on_memory_ratio: f64) -> Self { + Self { + reject_messages_on_memory_ratio, + } + } +} + +impl RaftGrpcMessageFilter for TiFlashGrpcMessageFilter { + fn should_reject_raft_message(&self, _: &RaftMessage) -> bool { + if self.reject_messages_on_memory_ratio < f64::EPSILON { + return false; + } + + let mut usage = 0; + memory_usage_reaches_high_water(&mut usage) + } + + fn should_reject_snapshot(&self) -> bool { + if self.reject_messages_on_memory_ratio < f64::EPSILON { + return false; + } + + let mut usage = 0; + memory_usage_reaches_high_water(&mut usage) + } +} + #[inline] pub fn run_impl( config: TikvConfig, @@ -1276,6 +1313,9 @@ impl TiKvServer { debug_thread_pool, health_controller, self.resource_manager.clone(), + Arc::new(TiFlashGrpcMessageFilter::new( + server_config.value().reject_messages_on_memory_ratio, + )), ) .unwrap_or_else(|e| fatal!("failed to create server: {}", e)); diff --git a/src/config/mod.rs b/src/config/mod.rs index b848dcefc22..60de30b2f83 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -7005,6 +7005,7 @@ mod tests { cfg.server.grpc_memory_pool_quota = default_cfg.server.grpc_memory_pool_quota; cfg.server.background_thread_count = default_cfg.server.background_thread_count; cfg.server.end_point_max_concurrency = default_cfg.server.end_point_max_concurrency; + cfg.server.end_point_memory_quota = default_cfg.server.end_point_memory_quota; cfg.storage.scheduler_worker_pool_size = default_cfg.storage.scheduler_worker_pool_size; cfg.rocksdb.max_background_jobs = default_cfg.rocksdb.max_background_jobs; cfg.rocksdb.max_background_flushes = default_cfg.rocksdb.max_background_flushes; diff --git a/src/server/config.rs b/src/server/config.rs index feba21a09f2..deb5676f9e9 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -40,7 +40,8 @@ const DEFAULT_ENDPOINT_REQUEST_MAX_HANDLE_SECS: u64 = 60; // Number of rows in each chunk for streaming coprocessor. const DEFAULT_ENDPOINT_STREAM_BATCH_ROW_LIMIT: usize = 128; -// By default, endpoint memory quota will be set to 12.5% of system memory. +// By default, endpoint memory quota will be set to 12.5% of the available +// memory of TiKV. // // TPCC check test shows that: // * The actual endpoint memory usage is about 3 times to memory quota. diff --git a/src/server/metrics.rs b/src/server/metrics.rs index 11ee6512831..3f3aa025f46 100644 --- a/src/server/metrics.rs +++ b/src/server/metrics.rs @@ -486,6 +486,11 @@ lazy_static! { "Count for rejected Raft append messages" ) .unwrap(); + pub static ref RAFT_SNAPSHOT_REJECTS: IntCounter = register_int_counter!( + "tikv_server_raft_snapshot_rejects", + "Count for rejected Raft snapshot messages" + ) + .unwrap(); pub static ref SNAP_LIMIT_TRANSPORT_BYTES_COUNTER: IntCounterVec = register_int_counter_vec!( "tikv_snapshot_limit_transport_bytes", "Total snapshot limit transport used", diff --git a/src/server/server.rs b/src/server/server.rs index e96fba10afd..ec88b776f4e 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -166,6 +166,7 @@ where debug_thread_pool: Arc, health_controller: HealthController, resource_manager: Option>, + raft_message_filter: Arc, ) -> Result { // A helper thread (or pool) for transport layer. let stats_pool = if cfg.value().stats_concurrency > 0 { @@ -211,6 +212,7 @@ where resource_manager, health_controller.clone(), health_feedback_interval, + raft_message_filter, ); let builder_factory = Box::new(BuilderFactory::new( kv_service, @@ -683,6 +685,7 @@ mod tests { debug_thread_pool, HealthController::new(), None, + Arc::new(DefaultGrpcMessageFilter::new(0.2)), ) .unwrap(); diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 4dc65cec1b1..c6d1a3b2b8e 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -71,6 +71,41 @@ use crate::{ const GRPC_MSG_MAX_BATCH_SIZE: usize = 128; const GRPC_MSG_NOTIFY_SIZE: usize = 8; +pub trait RaftGrpcMessageFilter: Send + Sync { + fn should_reject_raft_message(&self, _: &RaftMessage) -> bool; + fn should_reject_snapshot(&self) -> bool; +} + +// The default filter is exported for other engines as reference. +#[derive(Clone)] +pub struct DefaultGrpcMessageFilter { + reject_messages_on_memory_ratio: f64, +} + +impl DefaultGrpcMessageFilter { + pub fn new(reject_messages_on_memory_ratio: f64) -> Self { + Self { + reject_messages_on_memory_ratio, + } + } +} + +impl RaftGrpcMessageFilter for DefaultGrpcMessageFilter { + fn should_reject_raft_message(&self, msg: &RaftMessage) -> bool { + fail::fail_point!("force_reject_raft_append_message", |_| true); + if msg.get_message().get_msg_type() == MessageType::MsgAppend { + needs_reject_raft_append(self.reject_messages_on_memory_ratio) + } else { + false + } + } + + fn should_reject_snapshot(&self) -> bool { + fail::fail_point!("force_reject_raft_snapshot_message", |_| true); + false + } +} + /// Service handles the RPC messages for the `Tikv` service. pub struct Service { cluster_id: u64, @@ -103,6 +138,8 @@ pub struct Service { health_controller: HealthController, health_feedback_interval: Option, health_feedback_seq: Arc, + + raft_message_filter: Arc, } impl Drop for Service { @@ -130,6 +167,7 @@ impl Clone for Service Service { resource_manager: Option>, health_controller: HealthController, health_feedback_interval: Option, + raft_message_filter: Arc, ) -> Self { let now_unix = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -174,6 +213,7 @@ impl Service { health_controller, health_feedback_interval, health_feedback_seq: Arc::new(AtomicU64::new(now_unix)), + raft_message_filter, } } @@ -181,7 +221,7 @@ impl Service { store_id: u64, ch: &E::RaftExtension, msg: RaftMessage, - reject: bool, + raft_msg_filter: &Arc, ) -> RaftStoreResult<()> { let to_store_id = msg.get_to_peer().get_store_id(); if to_store_id != store_id { @@ -190,8 +230,11 @@ impl Service { my_store_id: store_id, }); } - if reject && msg.get_message().get_msg_type() == MessageType::MsgAppend { - RAFT_APPEND_REJECTS.inc(); + + if raft_msg_filter.should_reject_raft_message(&msg) { + if msg.get_message().get_msg_type() == MessageType::MsgAppend { + RAFT_APPEND_REJECTS.inc(); + } let id = msg.get_region_id(); let peer_id = msg.get_message().get_from(); ch.report_reject_message(id, peer_id); @@ -753,16 +796,15 @@ impl Tikv for Service { let store_id = self.store_id; let ch = self.storage.get_engine().raft_extension(); - let reject_messages_on_memory_ratio = self.reject_messages_on_memory_ratio; + let ob = self.raft_message_filter.clone(); let res = async move { let mut stream = stream.map_err(Error::from); while let Some(msg) = stream.try_next().await? { RAFT_MESSAGE_RECV_COUNTER.inc(); - let reject = needs_reject_raft_append(reject_messages_on_memory_ratio); if let Err(err @ RaftStoreError::StoreNotMatch { .. }) = - Self::handle_raft_message(store_id, &ch, msg, reject) + Self::handle_raft_message(store_id, &ch, msg, &ob) { // Return an error here will break the connection, only do that for // `StoreNotMatch` to let tikv to resolve a correct address from PD @@ -807,7 +849,7 @@ impl Tikv for Service { let store_id = self.store_id; let ch = self.storage.get_engine().raft_extension(); - let reject_messages_on_memory_ratio = self.reject_messages_on_memory_ratio; + let ob = self.raft_message_filter.clone(); let res = async move { let mut stream = stream.map_err(Error::from); @@ -822,10 +864,10 @@ impl Tikv for Service { let len = batch_msg.get_msgs().len(); RAFT_MESSAGE_RECV_COUNTER.inc_by(len as u64); RAFT_MESSAGE_BATCH_SIZE.observe(len as f64); - let reject = needs_reject_raft_append(reject_messages_on_memory_ratio); + for msg in batch_msg.take_msgs().into_iter() { if let Err(err @ RaftStoreError::StoreNotMatch { .. }) = - Self::handle_raft_message(store_id, &ch, msg, reject) + Self::handle_raft_message(store_id, &ch, msg, &ob) { // Return an error here will break the connection, only do that for // `StoreNotMatch` to let tikv to resolve a correct address from PD @@ -862,6 +904,13 @@ impl Tikv for Service { stream: RequestStream, sink: ClientStreamingSink, ) { + if self.raft_message_filter.should_reject_snapshot() { + RAFT_SNAPSHOT_REJECTS.inc(); + let status = + RpcStatus::with_message(RpcStatusCode::UNAVAILABLE, "rejected by peer".to_string()); + ctx.spawn(sink.fail(status).map(|_| ())); + return; + }; let task = SnapTask::Recv { stream, sink }; if let Err(e) = self.snap_scheduler.schedule(task) { let err_msg = format!("{}", e); diff --git a/src/server/service/mod.rs b/src/server/service/mod.rs index 00369a4ceae..dc2c254afe2 100644 --- a/src/server/service/mod.rs +++ b/src/server/service/mod.rs @@ -10,8 +10,8 @@ pub use self::{ diagnostics::Service as DiagnosticsService, kv::{ batch_commands_request, batch_commands_response, future_flashback_to_version, - future_prepare_flashback_to_version, GrpcRequestDuration, MeasuredBatchResponse, - MeasuredSingleResponse, Service as KvService, + future_prepare_flashback_to_version, DefaultGrpcMessageFilter, GrpcRequestDuration, + MeasuredBatchResponse, MeasuredSingleResponse, RaftGrpcMessageFilter, Service as KvService, }, }; diff --git a/src/storage/mvcc/reader/reader.rs b/src/storage/mvcc/reader/reader.rs index 454250737be..5e26f964521 100644 --- a/src/storage/mvcc/reader/reader.rs +++ b/src/storage/mvcc/reader/reader.rs @@ -350,30 +350,40 @@ impl MvccReader { Ok(None) }; - let mut locks = Vec::with_capacity(limit.min(memory_locks.len())); + let mut locks: Vec<(Key, Lock)> = Vec::with_capacity(limit.min(memory_locks.len())); let mut memory_iter = memory_locks.into_iter(); let mut memory_pair = memory_iter.next(); let mut storage_pair = next_pair_from_storage()?; let has_remain = loop { - match (memory_pair.as_ref(), storage_pair.as_ref()) { + let next_key = match (memory_pair.as_ref(), storage_pair.as_ref()) { (Some((memory_key, _)), Some((storage_key, _))) => { if storage_key <= memory_key { - locks.push(storage_pair.take().unwrap()); + let next_key = storage_pair.take().unwrap(); storage_pair = next_pair_from_storage()?; + next_key } else { - locks.push(memory_pair.take().unwrap()); + let next_key = memory_pair.take().unwrap(); memory_pair = memory_iter.next(); + next_key } } (Some(_), None) => { - locks.push(memory_pair.take().unwrap()); + let next_key = memory_pair.take().unwrap(); memory_pair = memory_iter.next(); + next_key } (None, Some(_)) => { - locks.push(storage_pair.take().unwrap()); + let next_key = storage_pair.take().unwrap(); storage_pair = next_pair_from_storage()?; + next_key } (None, None) => break memory_has_remain, + }; + // The same key could exist in both memory and storage when there is ongoing + // leader transfer, split or merge on this region. In this case, duplicated + // keys should be ignored. + if locks.is_empty() || locks.last().unwrap().0 != next_key.0 { + locks.push(next_key); } if limit > 0 && locks.len() >= limit { break memory_pair.is_some() || storage_pair.is_some() || memory_has_remain; diff --git a/src/storage/txn/actions/prewrite.rs b/src/storage/txn/actions/prewrite.rs index 9e95a3ef63f..b618f63892b 100644 --- a/src/storage/txn/actions/prewrite.rs +++ b/src/storage/txn/actions/prewrite.rs @@ -618,6 +618,10 @@ impl<'a> PrewriteMutation<'a> { if let Some(secondary_keys) = self.secondary_keys { lock.use_async_commit = true; lock.secondaries = secondary_keys.to_owned(); + } else if try_one_pc { + // Set `use_one_pc` to true to prevent the in-memory lock from being skipped + // when reading with max-ts. + lock.use_one_pc = true; } let final_min_commit_ts = if lock.use_async_commit || try_one_pc { @@ -632,6 +636,7 @@ impl<'a> PrewriteMutation<'a> { fail_point!("after_calculate_min_commit_ts"); if let Err(Error(box ErrorInner::CommitTsTooLarge { .. })) = &res { try_one_pc = false; + lock.use_one_pc = false; lock.use_async_commit = false; lock.secondaries = Vec::new(); } @@ -916,6 +921,8 @@ fn amend_pessimistic_lock( } pub mod tests { + #[cfg(test)] + use std::borrow::Cow; #[cfg(test)] use std::sync::Arc; @@ -926,7 +933,7 @@ pub mod tests { #[cfg(test)] use tikv_kv::RocksEngine; #[cfg(test)] - use txn_types::OldValue; + use txn_types::{OldValue, TsSet}; use super::*; #[cfg(test)] @@ -1275,6 +1282,8 @@ pub mod tests { // success 1pc prewrite needs to be transformed to locks assert!(!must_locked(&mut engine, b"k1", 10).use_async_commit); assert!(!must_locked(&mut engine, b"k2", 10).use_async_commit); + assert!(!must_locked(&mut engine, b"k1", 10).use_one_pc); + assert!(!must_locked(&mut engine, b"k2", 10).use_one_pc); } pub fn try_pessimistic_prewrite_check_not_exists( @@ -2757,4 +2766,59 @@ pub mod tests { prewrite_err(&mut engine, key, value, key, 120, 130, Some(130)); must_unlocked(&mut engine, key); } + + #[test] + fn test_1pc_set_lock_use_one_pc() { + let mut engine = crate::storage::TestEngineBuilder::new().build().unwrap(); + let cm = ConcurrencyManager::new(42.into()); + + let snapshot = engine.snapshot(Default::default()).unwrap(); + + let mut txn = MvccTxn::new(10.into(), cm.clone()); + let mut reader = SnapshotReader::new(10.into(), snapshot, false); + + let k1 = b"k1"; + let k2 = b"k2"; + + prewrite( + &mut txn, + &mut reader, + &optimistic_async_props(k1, 10.into(), 50.into(), 2, true), + Mutation::make_put(Key::from_raw(k1), b"v1".to_vec()), + &None, + SkipPessimisticCheck, + None, + ) + .unwrap(); + prewrite( + &mut txn, + &mut reader, + &optimistic_async_props(k1, 10.into(), 50.into(), 1, true), + Mutation::make_put(Key::from_raw(k2), b"v2".to_vec()), + &None, + SkipPessimisticCheck, + None, + ) + .unwrap(); + + // lock.use_one_pc should be set to true when using 1pc. + assert_eq!(txn.guards.len(), 2); + txn.guards[0].with_lock(|l| assert!(l.as_ref().unwrap().use_one_pc)); + txn.guards[1].with_lock(|l| assert!(l.as_ref().unwrap().use_one_pc)); + + // read with max_ts should be blocked by the lock. + for &key in &[k1, k2] { + let k = Key::from_raw(key); + let res = cm.read_key_check(&k, |l| { + Lock::check_ts_conflict( + Cow::Borrowed(l), + &k, + TimeStamp::max(), + &TsSet::Empty, + crate::storage::IsolationLevel::Si, + ) + }); + assert!(res.is_err()); + } + } } diff --git a/src/storage/txn/commands/prewrite.rs b/src/storage/txn/commands/prewrite.rs index 861083fb117..d9c7a65921b 100644 --- a/src/storage/txn/commands/prewrite.rs +++ b/src/storage/txn/commands/prewrite.rs @@ -988,7 +988,8 @@ fn handle_1pc_locks(txn: &mut MvccTxn, commit_ts: TimeStamp) -> ReleasedLocks { /// Change all 1pc locks in txn to 2pc locks. pub(in crate::storage::txn) fn fallback_1pc_locks(txn: &mut MvccTxn) { - for (key, lock, remove_pessimistic_lock) in std::mem::take(&mut txn.locks_for_1pc) { + for (key, mut lock, remove_pessimistic_lock) in std::mem::take(&mut txn.locks_for_1pc) { + lock.use_one_pc = false; let is_new_lock = !remove_pessimistic_lock; txn.put_lock(key, &lock, is_new_lock); } diff --git a/tests/failpoints/cases/test_kv_service.rs b/tests/failpoints/cases/test_kv_service.rs index c8777282787..c51b6232ba6 100644 --- a/tests/failpoints/cases/test_kv_service.rs +++ b/tests/failpoints/cases/test_kv_service.rs @@ -2,18 +2,20 @@ use std::{sync::Arc, time::Duration}; +use engine_traits::{Peekable, CF_LOCK}; use grpcio::{ChannelBuilder, Environment}; use kvproto::{ kvrpcpb::{PrewriteRequestPessimisticAction::SkipPessimisticCheck, *}, tikvpb::TikvClient, }; use test_raftstore::{ - configure_for_lease_read, must_kv_commit, must_kv_have_locks, must_kv_prewrite, - must_kv_prewrite_with, must_new_cluster_mul, new_server_cluster, try_kv_prewrite_with, - try_kv_prewrite_with_impl, + configure_for_lease_read, must_kv_commit, must_kv_have_locks, must_kv_pessimistic_lock, + must_kv_prewrite, must_kv_prewrite_with, must_new_cluster_and_kv_client_mul, + must_new_cluster_mul, new_server_cluster, try_kv_prewrite_with, try_kv_prewrite_with_impl, }; use test_raftstore_macro::test_case; -use tikv_util::{config::ReadableDuration, HandyRwLock}; +use tikv_util::{config::ReadableDuration, store::new_peer, HandyRwLock}; +use txn_types::Key; #[test_case(test_raftstore::must_new_cluster_and_kv_client)] #[test_case(test_raftstore_v2::must_new_cluster_and_kv_client)] @@ -270,3 +272,101 @@ fn test_storage_do_not_update_txn_status_cache_on_write_error() { must_kv_have_locks(&client, ctx, 29, b"k2", b"k3", &[(b"k2", Op::Put, 20, 20)]); fail::remove(cache_hit_fp); } + +#[test] +fn test_scan_locks_with_in_progress_transfer_leader() { + let (mut cluster, _, mut ctx) = must_new_cluster_and_kv_client_mul(3); + cluster.pd_client.disable_default_operator(); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + let leader_peer = cluster.leader_of_region(1).unwrap(); + ctx.set_peer(leader_peer.clone()); + let k1 = b"k1"; + let k2 = b"k2"; + let leader_region = cluster.get_region(k1); + ctx.set_region_epoch(leader_region.get_region_epoch().clone()); + let env = Arc::new(Environment::new(1)); + let channel = + ChannelBuilder::new(env).connect(&cluster.sim.rl().get_addr(leader_peer.get_store_id())); + let client = TikvClient::new(channel); + + // Create both pessimistic locks. + let start_ts = 10; + must_kv_pessimistic_lock(&client, ctx.clone(), k1.to_vec(), start_ts); + must_kv_pessimistic_lock(&client, ctx.clone(), k2.to_vec(), start_ts); + + // Ensure the pessimistic locks are written to the memory but not the storage. + let engine = cluster.get_engine(leader_peer.get_store_id()); + let cf_res = engine + .get_value_cf( + CF_LOCK, + keys::data_key(Key::from_raw(k1).as_encoded()).as_slice(), + ) + .unwrap(); + assert!(cf_res.is_none()); + let cf_res = engine + .get_value_cf( + CF_LOCK, + keys::data_key(Key::from_raw(k2).as_encoded()).as_slice(), + ) + .unwrap(); + assert!(cf_res.is_none()); + + let mut scan_lock_req = ScanLockRequest::default(); + scan_lock_req.set_context(ctx.clone()); + scan_lock_req.max_version = start_ts + 10; + scan_lock_req.limit = 256; + let scan_lock_resp = client.kv_scan_lock(&scan_lock_req.clone()).unwrap(); + assert!(!scan_lock_resp.has_region_error()); + assert_eq!(scan_lock_resp.get_locks().len(), 2); + assert_eq!(scan_lock_resp.locks.to_vec()[0].lock_version, start_ts); + assert_eq!(scan_lock_resp.locks.to_vec()[0].key, k1); + assert_eq!(scan_lock_resp.locks.to_vec()[1].lock_version, start_ts); + assert_eq!(scan_lock_resp.locks.to_vec()[1].key, k2); + + // Propose the transfer leader command but only trigger proposing pessimistic + // locks. + fail::cfg( + "finish_proposing_transfer_cmd_after_proposing_locks", + "return", + ) + .unwrap(); + let _ = cluster.try_transfer_leader_with_timeout(1, new_peer(2, 2), Duration::from_secs(1)); + + // Verify locks exist both in memory and storage. + let timer = tikv_util::time::Instant::now(); + let timeout = Duration::from_secs(5); + loop { + let cf_res = engine + .get_value_cf( + CF_LOCK, + keys::data_key(Key::from_raw(k1).as_encoded()).as_slice(), + ) + .unwrap(); + if cf_res.is_some() { + break; + } + std::thread::sleep(Duration::from_secs(1)); + if timer.saturating_elapsed() >= timeout { + assert!(cf_res.is_some()); + break; + } + } + let snapshot = cluster.must_get_snapshot_of_region(1); + let txn_ext = snapshot.txn_ext.unwrap(); + let guard = txn_ext.pessimistic_locks.read(); + assert!(guard.get(&Key::from_raw(k1)).is_some()); + assert!(guard.get(&Key::from_raw(k2)).is_some()); + drop(guard); + + fail::remove("finish_proposing_transfer_cmd_after_proposing_locks"); + + // Verify there should be no duplicate locks returned. + let scan_lock_resp = client.kv_scan_lock(&scan_lock_req.clone()).unwrap(); + assert!(!scan_lock_resp.has_region_error()); + assert_eq!(scan_lock_resp.locks.len(), 2); + assert_eq!(scan_lock_resp.locks.to_vec()[0].lock_version, start_ts); + assert_eq!(scan_lock_resp.locks.to_vec()[0].key, k1); + assert_eq!(scan_lock_resp.locks.to_vec()[1].lock_version, start_ts); + assert_eq!(scan_lock_resp.locks.to_vec()[1].key, k2); +} diff --git a/tests/failpoints/cases/test_server.rs b/tests/failpoints/cases/test_server.rs index dfbb883179c..c745442501e 100644 --- a/tests/failpoints/cases/test_server.rs +++ b/tests/failpoints/cases/test_server.rs @@ -156,3 +156,47 @@ fn test_serving_status() { thread::sleep(Duration::from_millis(200)); assert_eq!(check(), ServingStatus::Serving); } + +#[test] +fn test_raft_message_observer() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + cluster.must_put(b"k1", b"v1"); + + fail::cfg("force_reject_raft_append_message", "return").unwrap(); + fail::cfg("force_reject_raft_snapshot_message", "return").unwrap(); + + cluster.pd_client.add_peer(r1, new_peer(2, 2)); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + must_get_none(&cluster.get_engine(2), b"k1"); + + fail::remove("force_reject_raft_append_message"); + fail::remove("force_reject_raft_snapshot_message"); + + cluster.pd_client.must_have_peer(r1, new_peer(2, 2)); + cluster.pd_client.must_add_peer(r1, new_peer(3, 3)); + + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + fail::cfg("force_reject_raft_append_message", "return").unwrap(); + + let _ = cluster.async_put(b"k2", b"v2").unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + must_get_none(&cluster.get_engine(2), b"k2"); + must_get_none(&cluster.get_engine(3), b"k2"); + + fail::remove("force_reject_raft_append_message"); + + cluster.must_put(b"k3", b"v3"); + for id in 1..=3 { + must_get_equal(&cluster.get_engine(id), b"k3", b"v3"); + } + cluster.shutdown(); +}