Skip to content

Commit

Permalink
Merge remote-tracking branch 'tikv/master' into u-mem
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo committed Jan 20, 2025
2 parents 6b553bd + c88260c commit 8c7ab2d
Show file tree
Hide file tree
Showing 32 changed files with 597 additions and 58 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ testexport = [
"engine_rocks/testexport",
"engine_panic/testexport",
"encryption/testexport",
"file_system/testexport"
"file_system/testexport",
]
test-engine-kv-rocksdb = ["engine_test/test-engine-kv-rocksdb"]
test-engine-raft-raft-engine = ["engine_test/test-engine-raft-raft-engine"]
Expand Down
1 change: 0 additions & 1 deletion components/batch-system/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ where
let t = thread::Builder::new()
.name(name)
.spawn_wrapper(move || {
tikv_alloc::thread_allocate_exclusive_arena().unwrap();
tikv_util::thread_group::set_properties(props);
set_io_type(IoType::ForegroundWrite);
poller.poll();
Expand Down
3 changes: 0 additions & 3 deletions components/engine_rocks/src/import.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

#![allow(unused_variables)]
#![allow(unreachable_code)]

use engine_traits::{ImportExt, IngestExternalFileOptions, Result};
use rocksdb::IngestExternalFileOptions as RawIngestExternalFileOptions;
use tikv_util::time::Instant;
Expand Down
4 changes: 4 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3820,6 +3820,10 @@ where
None => {
self.propose_pending_batch_raft_command();
if self.propose_locks_before_transfer_leader(msg) {
fail_point!(
"finish_proposing_transfer_cmd_after_proposing_locks",
|_| {}
);
// If some pessimistic locks are just proposed, we propose another
// TransferLeader command instead of transferring leader immediately.
info!("propose transfer leader command";
Expand Down
2 changes: 1 addition & 1 deletion components/resource_control/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ lazy_static! {
"tikv_resource_control_priority_wait_duration",
"Histogram of wait duration cause by priority quota limiter",
&["priority"],
exponential_buckets(1e-5, 2.0, 18).unwrap() // 10us ~ 2.5s
exponential_buckets(1e-5, 2.0, 22).unwrap() // 10us ~ 42s
)
.unwrap();

Expand Down
11 changes: 8 additions & 3 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
service::{DebugService, DefaultGrpcMessageFilter, DiagnosticsService},
status_server::StatusServer,
tablet_snap::NoSnapshotCache,
ttl::TtlChecker,
Expand All @@ -122,7 +122,9 @@ use tikv::{
Engine, Storage,
},
};
use tikv_alloc::{add_thread_memory_accessor, remove_thread_memory_accessor};
use tikv_alloc::{
add_thread_memory_accessor, remove_thread_memory_accessor, thread_allocate_exclusive_arena,
};
use tikv_util::{
check_environment_variables,
config::VersionTrack,
Expand Down Expand Up @@ -330,7 +332,7 @@ where

// SAFETY: we will call `remove_thread_memory_accessor` at before_stop.
unsafe { add_thread_memory_accessor() };
tikv_alloc::thread_allocate_exclusive_arena().unwrap();
thread_allocate_exclusive_arena().unwrap();
})
.before_stop(|| {
remove_thread_memory_accessor();
Expand Down Expand Up @@ -889,6 +891,9 @@ where
debug_thread_pool,
health_controller,
self.resource_manager.clone(),
Arc::new(DefaultGrpcMessageFilter::new(
server_config.value().reject_messages_on_memory_ratio,
)),
)
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));
cfg_controller.register(
Expand Down
10 changes: 8 additions & 2 deletions components/server/src/server2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
service::{DebugService, DefaultGrpcMessageFilter, DiagnosticsService},
status_server::StatusServer,
KvEngineFactoryBuilder, NodeV2, RaftKv2, Server, CPU_CORES_QUOTA_GAUGE, GRPC_THREAD_PREFIX,
MEMORY_LIMIT_GAUGE,
Expand All @@ -109,7 +109,9 @@ use tikv::{
Engine, Storage,
},
};
use tikv_alloc::{add_thread_memory_accessor, remove_thread_memory_accessor};
use tikv_alloc::{
add_thread_memory_accessor, remove_thread_memory_accessor, thread_allocate_exclusive_arena,
};
use tikv_util::{
check_environment_variables,
config::VersionTrack,
Expand Down Expand Up @@ -301,6 +303,7 @@ where

// SAFETY: we will call `remove_thread_memory_accessor` at before_stop.
unsafe { add_thread_memory_accessor() };
thread_allocate_exclusive_arena().unwrap();
})
.before_stop(|| {
remove_thread_memory_accessor();
Expand Down Expand Up @@ -826,6 +829,9 @@ where
debug_thread_pool,
health_controller,
self.resource_manager.clone(),
Arc::new(DefaultGrpcMessageFilter::new(
server_config.value().reject_messages_on_memory_ratio,
)),
)
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));
cfg_controller.register(
Expand Down
5 changes: 4 additions & 1 deletion components/test_raftstore-v2/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
service::{DebugService, DefaultGrpcMessageFilter, DiagnosticsService},
ConnectionBuilder, Error, Extension, NodeV2, PdStoreAddrResolver, RaftClient, RaftKv2,
Result as ServerResult, Server, ServerTransport,
},
Expand Down Expand Up @@ -644,6 +644,9 @@ impl<EK: KvEngine> ServerCluster<EK> {
debug_thread_pool.clone(),
health_controller.clone(),
resource_manager.clone(),
Arc::new(DefaultGrpcMessageFilter::new(
server_cfg.value().reject_messages_on_memory_ratio,
)),
)
.unwrap();
svr.register_service(create_diagnostics(diag_service.clone()));
Expand Down
11 changes: 11 additions & 0 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,17 @@ impl<T: Simulator> Cluster<T> {
.unwrap()
}

pub fn try_transfer_leader_with_timeout(
&mut self,
region_id: u64,
leader: metapb::Peer,
timeout: Duration,
) -> Result<RaftCmdResponse> {
let epoch = self.get_region_epoch(region_id);
let transfer_leader = new_admin_request(region_id, &epoch, new_transfer_leader_cmd(leader));
self.call_command_on_leader(transfer_leader, timeout)
}

pub fn get_snap_dir(&self, node_id: u64) -> String {
self.sim.rl().get_snap_dir(node_id)
}
Expand Down
5 changes: 4 additions & 1 deletion components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve::{self, StoreAddrResolver},
service::DebugService,
service::{DebugService, DefaultGrpcMessageFilter},
tablet_snap::NoSnapshotCache,
ConnectionBuilder, Error, MultiRaftServer, PdStoreAddrResolver, RaftClient, RaftKv,
Result as ServerResult, Server, ServerTransport,
Expand Down Expand Up @@ -617,6 +617,9 @@ impl ServerCluster {
debug_thread_pool.clone(),
health_controller.clone(),
resource_manager.clone(),
Arc::new(DefaultGrpcMessageFilter::new(
server_cfg.value().reject_messages_on_memory_ratio,
)),
)
.unwrap();
svr.register_service(create_import_sst(import_service.clone()));
Expand Down
9 changes: 7 additions & 2 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1305,8 +1305,13 @@ pub fn kv_pessimistic_lock_with_ttl(
}

pub fn must_kv_pessimistic_lock(client: &TikvClient, ctx: Context, key: Vec<u8>, ts: u64) {
let resp = kv_pessimistic_lock(client, ctx, vec![key], ts, ts, false);
assert!(!resp.has_region_error(), "{:?}", resp.get_region_error());
let resp = kv_pessimistic_lock(client, ctx.clone(), vec![key], ts, ts, false);
assert!(
!resp.has_region_error(),
"{:?}, ctx:{:?}",
resp.get_region_error(),
ctx
);
assert!(resp.errors.is_empty(), "{:?}", resp.get_errors());
}

Expand Down
13 changes: 13 additions & 0 deletions components/tidb_query_datatype/src/codec/mysql/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,19 @@ impl Time {
Time::try_from_chrono_datetime(ctx, timestamp.naive_local(), time_type, fsp as i8)
}

pub fn from_unixtime(
ctx: &mut EvalContext,
seconds: i64,
nanos: u32,
time_type: TimeType,
fsp: i8,
) -> Result<Self> {
let timestamp = Utc.timestamp(seconds, nanos);
let timestamp = ctx.cfg.tz.from_utc_datetime(&timestamp.naive_utc());
let timestamp = timestamp.round_subsecs(fsp as u16);
Time::try_from_chrono_datetime(ctx, timestamp.naive_local(), time_type, fsp)
}

pub fn from_year(
ctx: &mut EvalContext,
year: u32,
Expand Down
Loading

0 comments on commit 8c7ab2d

Please sign in to comment.