From f2b91f0d336175761e5b4a57f92b5184911f387d Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 1 Sep 2022 13:28:30 +0800 Subject: [PATCH 1/4] independent handle Signed-off-by: CalvinNeo --- .../raftstore/src/engine_store_ffi/read_index_helper.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/components/raftstore/src/engine_store_ffi/read_index_helper.rs b/components/raftstore/src/engine_store_ffi/read_index_helper.rs index fcfe5b4a6e0..bd9d2650f3a 100644 --- a/components/raftstore/src/engine_store_ffi/read_index_helper.rs +++ b/components/raftstore/src/engine_store_ffi/read_index_helper.rs @@ -20,6 +20,10 @@ use crate::{ store::{Callback, RaftCmdExtraOpts, RaftRouter, ReadResponse}, }; +lazy_static! { + pub static ref PROXY_TIMER_HANDLE: Handle = start_global_timer_with_name("proxy-timer"); +} + pub trait ReadIndex: Sync + Send { // To remove fn batch_read_index( @@ -153,7 +157,7 @@ impl ReadIndex for ReadIndexClient { futures::pin_mut!(read_index_fut); let deadline = Instant::now() + timeout; - let delay = tikv_util::timer::PROXY_TIMER_HANDLE + let delay = PROXY_TIMER_HANDLE .delay(deadline) .compat(); let ret = futures::future::select(read_index_fut, delay); From 86b0f0ba942ef89feef2919f1b3ca5e9f6556249 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 1 Sep 2022 13:28:48 +0800 Subject: [PATCH 2/4] f Signed-off-by: CalvinNeo --- components/tikv_util/src/metrics/mod.rs | 2 +- .../tikv_util/src/metrics/threads_dummy.rs | 4 - .../tikv_util/src/metrics/threads_linux.rs | 109 +++++------------- components/tikv_util/src/mpsc/mod.rs | 7 +- components/tikv_util/src/quota_limiter.rs | 2 - components/tikv_util/src/timer.rs | 10 +- components/tikv_util/src/worker/pool.rs | 47 +------- 7 files changed, 38 insertions(+), 143 deletions(-) diff --git a/components/tikv_util/src/metrics/mod.rs b/components/tikv_util/src/metrics/mod.rs index a910e7c547e..e9eb4594be9 100644 --- a/components/tikv_util/src/metrics/mod.rs +++ b/components/tikv_util/src/metrics/mod.rs @@ -7,7 +7,7 @@ use prometheus_static_metric::*; #[cfg(target_os = "linux")] mod threads_linux; #[cfg(target_os = "linux")] -pub use self::threads_linux::{cpu_total, get_thread_ids, monitor_threads, ThreadInfoStatistics}; +pub use self::threads_linux::{monitor_threads, ThreadInfoStatistics}; #[cfg(target_os = "linux")] mod process_linux; diff --git a/components/tikv_util/src/metrics/threads_dummy.rs b/components/tikv_util/src/metrics/threads_dummy.rs index 40613d9ce11..3bc60a4f5d4 100644 --- a/components/tikv_util/src/metrics/threads_dummy.rs +++ b/components/tikv_util/src/metrics/threads_dummy.rs @@ -37,10 +37,6 @@ impl ThreadInfoStatistics { } } -pub fn dump_thread_stats() -> String { - "only support linux".into() -} - impl Default for ThreadInfoStatistics { fn default() -> Self { Self::new() diff --git a/components/tikv_util/src/metrics/threads_linux.rs b/components/tikv_util/src/metrics/threads_linux.rs index 456353165c7..8ee9aed05f5 100644 --- a/components/tikv_util/src/metrics/threads_linux.rs +++ b/components/tikv_util/src/metrics/threads_linux.rs @@ -1,15 +1,12 @@ // Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0. use std::{ - fs, io::{Error, ErrorKind, Result}, sync::Mutex, time::Duration, }; use collections::HashMap; -use lazy_static::lazy_static; -use libc::{self, pid_t}; use procinfo::pid; use prometheus::{ self, @@ -24,7 +21,7 @@ use crate::{ /// Monitors threads of the current process. pub fn monitor_threads>(namespace: S) -> Result<()> { - let pid = unsafe { libc::getpid() }; + let pid = thread::process_id(); let tc = ThreadsCollector::new(pid, namespace); prometheus::register(Box::new(tc)).map_err(|e| to_io_err(format!("{:?}", e))) } @@ -62,7 +59,7 @@ impl Metrics { ).namespace(ns.clone()), &["name", "tid", "io"], ) - .unwrap(); + .unwrap(); let voluntary_ctxt_switches = IntGaugeVec::new( Opts::new( "thread_voluntary_context_switches", @@ -113,14 +110,14 @@ impl Metrics { /// A collector to collect threads metrics, including CPU usage /// and threads state. struct ThreadsCollector { - pid: pid_t, + pid: Pid, descs: Vec, metrics: Mutex, tid_retriever: Mutex, } impl ThreadsCollector { - fn new>(pid: pid_t, namespace: S) -> ThreadsCollector { + fn new>(pid: Pid, namespace: S) -> ThreadsCollector { let metrics = Metrics::new(namespace); ThreadsCollector { pid, @@ -149,9 +146,9 @@ impl Collector for ThreadsCollector { } for tid in tids { let tid = *tid; - if let Ok(stat) = pid::stat_task(self.pid, tid) { + if let Ok(stat) = thread::full_thread_stat(self.pid, tid) { // Threads CPU time. - let total = cpu_total(&stat); + let total = thread::linux::cpu_total(&stat); // sanitize thread name before push metrics. let name = sanitize_thread_name(tid, &stat.command); let cpu_total = metrics @@ -212,38 +209,6 @@ impl Collector for ThreadsCollector { } } -/// Gets thread ids of the given process id. -/// WARN: Don't call this function frequently. Otherwise there will be a lot of memory fragments. -pub fn get_thread_ids(pid: pid_t) -> Result> { - let mut tids: Vec = fs::read_dir(format!("/proc/{}/task", pid))? - .filter_map(|task| { - let file_name = match task { - Ok(t) => t.file_name(), - Err(e) => { - error!("read task failed"; "pid" => pid, "err" => ?e); - return None; - } - }; - - match file_name.to_str() { - Some(tid) => match tid.parse() { - Ok(tid) => Some(tid), - Err(e) => { - error!("read task failed"; "pid" => pid, "err" => ?e); - None - } - }, - None => { - error!("read task failed"; "pid" => pid); - None - } - } - }) - .collect(); - tids.sort_unstable(); - Ok(tids) -} - /// Sanitizes the thread name. Keeps `a-zA-Z0-9_:`, replaces `-` and ` ` with `_`, and drops the others. /// /// Examples: @@ -256,7 +221,7 @@ pub fn get_thread_ids(pid: pid_t) -> Result> { /// assert_eq!(sanitize_thread_name(1, "@123"), "123"); /// assert_eq!(sanitize_thread_name(1, "@@@@"), "1"); /// ``` -fn sanitize_thread_name(tid: pid_t, raw: &str) -> String { +fn sanitize_thread_name(tid: Pid, raw: &str) -> String { let mut name = String::with_capacity(raw.len()); // sanitize thread name. for c in raw.chars() { @@ -293,23 +258,10 @@ fn state_to_str(state: &pid::State) -> &str { } } -pub fn cpu_total(state: &pid::Stat) -> f64 { - (state.utime + state.stime) as f64 / *CLK_TCK -} - fn to_io_err(s: String) -> Error { Error::new(ErrorKind::Other, s) } -lazy_static! { - // getconf CLK_TCK - static ref CLK_TCK: f64 = { - unsafe { - libc::sysconf(libc::_SC_CLK_TCK) as f64 - } - }; -} - #[inline] fn get_name(command: &str) -> String { if !command.is_empty() { @@ -367,7 +319,7 @@ impl ThreadMetrics { /// Use to collect cpu usages and disk I/O rates pub struct ThreadInfoStatistics { - pid: pid_t, + pid: Pid, last_instant: Instant, tid_names: HashMap, tid_retriever: TidRetriever, @@ -377,7 +329,7 @@ pub struct ThreadInfoStatistics { impl ThreadInfoStatistics { pub fn new() -> Self { - let pid = unsafe { libc::getpid() }; + let pid = thread::process_id(); let mut thread_stats = Self { pid, @@ -406,13 +358,13 @@ impl ThreadInfoStatistics { for tid in tids { let tid = *tid; - if let Ok(stat) = pid::stat_task(self.pid, tid) { + if let Ok(stat) = thread::full_thread_stat(self.pid, tid) { let name = get_name(&stat.command); self.tid_names.entry(tid).or_insert(name); // To get a percentage result, // we pre-multiply `cpu_time` by 100 here rather than inside the `update_metric`. - let cpu_time = cpu_total(&stat) * 100.0; + let cpu_time = thread::linux::cpu_total(&stat) * 100.0; update_metric( &mut self.metrics_total.cpu_times, &mut self.metrics_rate.cpu_times, @@ -472,28 +424,31 @@ const TID_MAX_UPDATE_INTERVAL: Duration = Duration::from_secs(10 * 60); /// A helper that buffers the thread id list internally. struct TidRetriever { - pid: pid_t, + pid: Pid, tid_buffer: Vec, tid_buffer_last_update: Instant, tid_buffer_update_interval: Duration, } impl TidRetriever { - pub fn new(pid: pid_t) -> Self { + pub fn new(pid: Pid) -> Self { + let mut tid_buffer: Vec<_> = thread::thread_ids(pid).unwrap(); + tid_buffer.sort_unstable(); Self { pid, - tid_buffer: get_thread_ids(pid).unwrap(), + tid_buffer, tid_buffer_last_update: Instant::now(), tid_buffer_update_interval: TID_MIN_UPDATE_INTERVAL, } } - pub fn get_tids(&mut self) -> (&[pid_t], bool) { + pub fn get_tids(&mut self) -> (&[Pid], bool) { // Update the tid list according to tid_buffer_update_interval. // If tid is not changed, update the tid list less frequently. let mut updated = false; if self.tid_buffer_last_update.saturating_elapsed() >= self.tid_buffer_update_interval { - let new_tid_buffer = get_thread_ids(self.pid).unwrap(); + let mut new_tid_buffer: Vec<_> = thread::thread_ids(self.pid).unwrap(); + new_tid_buffer.sort_unstable(); if new_tid_buffer == self.tid_buffer { self.tid_buffer_update_interval *= 2; if self.tid_buffer_update_interval > TID_MAX_UPDATE_INTERVAL { @@ -522,7 +477,7 @@ mod tests { let name = "testthreadio"; let (tx, rx) = sync::mpsc::channel(); let (tx1, rx1) = sync::mpsc::channel(); - let h = thread::Builder::new() + let h = std::thread::Builder::new() .name(name.to_owned()) .spawn(move || { // Make `io::write_bytes` > 0 @@ -541,13 +496,13 @@ mod tests { rx1.recv().unwrap(); let page_size = unsafe { libc::sysconf(libc::_SC_PAGE_SIZE) as usize }; - let pid = unsafe { libc::getpid() }; - let tids = get_thread_ids(pid).unwrap(); + let pid = thread::process_id(); + let tids: Vec<_> = thread::thread_ids(pid).unwrap(); assert!(tids.len() >= 2); tids.iter() .find(|t| { - pid::stat_task(pid, **t) + thread::full_thread_stat(pid, **t) .map(|stat| stat.command == name) .unwrap_or(false) }) @@ -571,7 +526,7 @@ mod tests { ) -> (sync::mpsc::Sender<()>, sync::mpsc::Receiver<()>) { let (tx, rx) = sync::mpsc::channel(); let (tx1, rx1) = sync::mpsc::channel(); - thread::Builder::new() + std::thread::Builder::new() .name(str1.to_owned()) .spawn(move || { tx1.send(()).unwrap(); @@ -613,10 +568,10 @@ mod tests { let mut thread_info = ThreadInfoStatistics::new(); let page_size = unsafe { libc::sysconf(libc::_SC_PAGE_SIZE) as u64 }; - let pid = unsafe { libc::getpid() }; - let tids = get_thread_ids(pid).unwrap(); + let pid = thread::process_id(); + let tids: Vec<_> = thread::thread_ids(pid).unwrap(); for tid in tids { - if let Ok(stat) = pid::stat_task(pid, tid) { + if let Ok(stat) = thread::full_thread_stat(pid, tid) { if stat.command.starts_with(s1) { rx1.recv().unwrap(); thread_info.record(); @@ -657,7 +612,7 @@ mod tests { ) -> (sync::mpsc::Sender<()>, sync::mpsc::Receiver<()>) { let (tx, rx) = sync::mpsc::channel(); let (tx1, rx1) = sync::mpsc::channel(); - thread::Builder::new() + std::thread::Builder::new() .name(name) .spawn(move || { tx1.send(()).unwrap(); @@ -687,10 +642,10 @@ mod tests { let mut thread_info = ThreadInfoStatistics::new(); - let pid = unsafe { libc::getpid() }; - let tids = get_thread_ids(pid).unwrap(); + let pid = thread::process_id(); + let tids: Vec<_> = thread::thread_ids(pid).unwrap(); for tid in tids { - if let Ok(stat) = pid::stat_task(pid, tid) { + if let Ok(stat) = thread::full_thread_stat(pid, tid) { if stat.command.starts_with(tn) { rx.recv().unwrap(); thread_info.record(); @@ -748,7 +703,7 @@ mod tests { #[test] fn test_smoke() { - let pid = unsafe { libc::getpid() }; + let pid = thread::process_id(); let tc = ThreadsCollector::new(pid, "smoke"); tc.collect(); tc.desc(); diff --git a/components/tikv_util/src/mpsc/mod.rs b/components/tikv_util/src/mpsc/mod.rs index 405d5c231cc..99dd6b3e5d0 100644 --- a/components/tikv_util/src/mpsc/mod.rs +++ b/components/tikv_util/src/mpsc/mod.rs @@ -21,7 +21,6 @@ use std::{ use crossbeam::channel::{ self, RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError, }; -use fail::fail_point; struct State { sender_cnt: AtomicIsize, @@ -240,11 +239,7 @@ impl LooseBoundedSender { #[inline] pub fn try_send(&self, t: T) -> Result<(), TrySendError> { let cnt = self.tried_cnt.get(); - let check_interval = || { - fail_point!("loose_bounded_sender_check_interval", |_| 0); - CHECK_INTERVAL - }; - if cnt < check_interval() { + if cnt < CHECK_INTERVAL { self.tried_cnt.set(cnt + 1); } else if self.len() < self.limit { self.tried_cnt.set(1); diff --git a/components/tikv_util/src/quota_limiter.rs b/components/tikv_util/src/quota_limiter.rs index 0853b5c2b01..6179ab75da6 100644 --- a/components/tikv_util/src/quota_limiter.rs +++ b/components/tikv_util/src/quota_limiter.rs @@ -280,7 +280,6 @@ mod tests { std::thread::sleep(Duration::from_millis(10)); let mut sample = quota_limiter.new_sample(); - sample.add_cpu_time(Duration::from_millis(20)); let should_delay = block_on(quota_limiter.async_consume(sample)); // should less 60+50+20 @@ -290,7 +289,6 @@ mod tests { sample.add_cpu_time(Duration::from_millis(200)); sample.add_write_bytes(256); let should_delay = block_on(quota_limiter.async_consume(sample)); - check_duration(should_delay, Duration::from_millis(250)); // ThreadTime elapsed time is not long. diff --git a/components/tikv_util/src/timer.rs b/components/tikv_util/src/timer.rs index 3b24d66624d..dc74dbb3b43 100644 --- a/components/tikv_util/src/timer.rs +++ b/components/tikv_util/src/timer.rs @@ -90,14 +90,14 @@ impl Ord for TimeoutTask { } lazy_static! { - pub static ref GLOBAL_TIMER_HANDLE: Handle = start_global_timer("timer"); + pub static ref GLOBAL_TIMER_HANDLE: Handle = start_global_timer(); } -fn start_global_timer(name: &str) -> Handle { +fn start_global_timer() -> Handle { let (tx, rx) = mpsc::channel(); let props = crate::thread_group::current_properties(); Builder::new() - .name(thd_name!(name)) + .name(thd_name!("timer")) .spawn(move || { crate::thread_group::set_properties(props); tikv_alloc::add_thread_memory_accessor(); @@ -273,7 +273,3 @@ mod tests { ); } } - -lazy_static! { - pub static ref PROXY_TIMER_HANDLE: Handle = start_global_timer("proxy-timer"); -} diff --git a/components/tikv_util/src/worker/pool.rs b/components/tikv_util/src/worker/pool.rs index ab5e19a6e3a..841a8a2229d 100644 --- a/components/tikv_util/src/worker/pool.rs +++ b/components/tikv_util/src/worker/pool.rs @@ -90,7 +90,6 @@ impl Drop for RunnableWrapper { enum Msg { Task(T), Timeout, - Poll(Arc), } /// Scheduler provides interface to schedule task to underlying workers. @@ -401,21 +400,13 @@ impl Worker { fn delay_notify(tx: UnboundedSender>, timeout: Duration) { let now = Instant::now(); - let x = tx.clone(); let f = GLOBAL_TIMER_HANDLE .delay(now + timeout) .compat() .map(move |_| { let _ = tx.unbounded_send(Msg::::Timeout); }); - let f: BoxFuture<'static, ()> = Box::pin(f); - let waker = Arc::new(NotifyWaker { - future: Mutex::new(Some(f)), - retry_fn: Box::new(move |waker: Arc| { - let _ = x.unbounded_send(Msg::Poll(waker)); - }), - }); - waker.wake(); + poll_future_notify(f); } pub fn lazy_build>( @@ -472,7 +463,6 @@ impl Worker { metrics_pending_task_count.dec(); } Msg::Timeout => (), - _ => (), } } }); @@ -504,47 +494,12 @@ impl Worker { let timeout = handle.inner.get_interval(); Self::delay_notify(tx.clone(), timeout); } - Msg::Poll(waker) => { - waker.wake(); - } } } }); } } -use futures::{ - future::BoxFuture, - task::{self, ArcWake, Context, Poll}, -}; - -struct NotifyWaker { - future: Mutex>>, - retry_fn: Box) + Send + Sync>, -} - -impl ArcWake for NotifyWaker { - fn wake_by_ref(arc_self: &Arc) { - match arc_self.future.try_lock() { - Ok(mut future_slot) => { - if let Some(mut future) = future_slot.take() { - let waker = task::waker_ref(&arc_self); - let cx = &mut Context::from_waker(&*waker); - match future.as_mut().poll(cx) { - Poll::Pending => { - *future_slot = Some(future); - } - Poll::Ready(()) => {} - } - } - } - Err(_) => { - let _ = (arc_self.retry_fn)(arc_self.clone()); - } - } - } -} - mod tests { use std::{ From 9620136d2d6c4af8d7d014a274c2ce4ecebb410a Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 1 Sep 2022 14:45:38 +0800 Subject: [PATCH 3/4] f Signed-off-by: CalvinNeo --- Cargo.lock | 1 + components/raftstore/Cargo.toml | 1 + components/raftstore/src/engine_store_ffi/mod.rs | 1 + .../src/engine_store_ffi/read_index_helper.rs | 8 +------- components/raftstore/src/engine_store_ffi/utils.rs | 13 +++++++++---- components/tikv_util/src/timer.rs | 9 +++++++-- 6 files changed, 20 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1800aaec9f..1456311a65b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4330,6 +4330,7 @@ dependencies = [ "tikv_util", "time", "tokio", + "tokio-timer", "txn_types", "uuid", "yatp", diff --git a/components/raftstore/Cargo.toml b/components/raftstore/Cargo.toml index 86a2e51748e..7e1bbc548e5 100644 --- a/components/raftstore/Cargo.toml +++ b/components/raftstore/Cargo.toml @@ -90,6 +90,7 @@ tikv_alloc = { path = "../tikv_alloc" } tikv_util = { path = "../tikv_util", default-features = false } time = "0.1" tokio = { version = "1.5", features = ["sync", "rt-multi-thread"] } +tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hotfix" } txn_types = { path = "../txn_types", default-features = false } uuid = { version = "0.8.1", features = ["serde", "v4"] } yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" } diff --git a/components/raftstore/src/engine_store_ffi/mod.rs b/components/raftstore/src/engine_store_ffi/mod.rs index dd520a7b8f1..507c9430a7a 100644 --- a/components/raftstore/src/engine_store_ffi/mod.rs +++ b/components/raftstore/src/engine_store_ffi/mod.rs @@ -24,6 +24,7 @@ use engine_traits::{ SstReader, CF_DEFAULT, CF_LOCK, CF_WRITE, }; use kvproto::{kvrpcpb, metapb, raft_cmdpb}; +use lazy_static::lazy_static; use protobuf::Message; pub use read_index_helper::ReadIndexClient; diff --git a/components/raftstore/src/engine_store_ffi/read_index_helper.rs b/components/raftstore/src/engine_store_ffi/read_index_helper.rs index bd9d2650f3a..1faf4e76db9 100644 --- a/components/raftstore/src/engine_store_ffi/read_index_helper.rs +++ b/components/raftstore/src/engine_store_ffi/read_index_helper.rs @@ -20,10 +20,6 @@ use crate::{ store::{Callback, RaftCmdExtraOpts, RaftRouter, ReadResponse}, }; -lazy_static! { - pub static ref PROXY_TIMER_HANDLE: Handle = start_global_timer_with_name("proxy-timer"); -} - pub trait ReadIndex: Sync + Send { // To remove fn batch_read_index( @@ -157,9 +153,7 @@ impl ReadIndex for ReadIndexClient { futures::pin_mut!(read_index_fut); let deadline = Instant::now() + timeout; - let delay = PROXY_TIMER_HANDLE - .delay(deadline) - .compat(); + let delay = super::utils::PROXY_TIMER_HANDLE.delay(deadline).compat(); let ret = futures::future::select(read_index_fut, delay); match block_on(ret) { futures::future::Either::Left(_) => true, diff --git a/components/raftstore/src/engine_store_ffi/utils.rs b/components/raftstore/src/engine_store_ffi/utils.rs index 7f5394133ae..dda0627526e 100644 --- a/components/raftstore/src/engine_store_ffi/utils.rs +++ b/components/raftstore/src/engine_store_ffi/utils.rs @@ -3,6 +3,14 @@ use std::time; use futures_util::{compat::Future01CompatExt, future::BoxFuture, FutureExt}; +use tikv_util::timer::start_global_timer_with_name; +use tokio_timer::timer::Handle; + +use crate::engine_store_ffi::lazy_static; + +lazy_static! { + pub static ref PROXY_TIMER_HANDLE: Handle = start_global_timer_with_name("proxy-timer"); +} pub type ArcNotifyWaker = std::sync::Arc; @@ -22,10 +30,7 @@ pub struct TimerTask { pub fn make_timer_task(millis: u64) -> TimerTask { let deadline = time::Instant::now() + time::Duration::from_millis(millis); - let delay = tikv_util::timer::PROXY_TIMER_HANDLE - .delay(deadline) - .compat() - .map(|_| {}); + let delay = PROXY_TIMER_HANDLE.delay(deadline).compat().map(|_| {}); TimerTask { future: Box::pin(delay), } diff --git a/components/tikv_util/src/timer.rs b/components/tikv_util/src/timer.rs index dc74dbb3b43..8b9f7118261 100644 --- a/components/tikv_util/src/timer.rs +++ b/components/tikv_util/src/timer.rs @@ -93,11 +93,12 @@ lazy_static! { pub static ref GLOBAL_TIMER_HANDLE: Handle = start_global_timer(); } -fn start_global_timer() -> Handle { +/// Create a global timer with specific thread name. +pub fn start_global_timer_with_name(name: &str) -> Handle { let (tx, rx) = mpsc::channel(); let props = crate::thread_group::current_properties(); Builder::new() - .name(thd_name!("timer")) + .name(thd_name!(name)) .spawn(move || { crate::thread_group::set_properties(props); tikv_alloc::add_thread_memory_accessor(); @@ -111,6 +112,10 @@ fn start_global_timer() -> Handle { rx.recv().unwrap() } +fn start_global_timer() -> Handle { + start_global_timer_with_name("timer") +} + /// A struct that marks the *zero* time. /// /// A *zero* time can be any time, as what it represents is `Instant`, From 933ef41506f19f0804a21bc3241d3066f2237962 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 6 Sep 2022 10:40:57 +0800 Subject: [PATCH 4/4] f Signed-off-by: CalvinNeo --- components/raftstore/src/engine_store_ffi/utils.rs | 4 ++-- components/tikv_util/src/timer.rs | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/components/raftstore/src/engine_store_ffi/utils.rs b/components/raftstore/src/engine_store_ffi/utils.rs index dda0627526e..90775a4800e 100644 --- a/components/raftstore/src/engine_store_ffi/utils.rs +++ b/components/raftstore/src/engine_store_ffi/utils.rs @@ -3,13 +3,13 @@ use std::time; use futures_util::{compat::Future01CompatExt, future::BoxFuture, FutureExt}; -use tikv_util::timer::start_global_timer_with_name; +use tikv_util::timer::start_global_timer; use tokio_timer::timer::Handle; use crate::engine_store_ffi::lazy_static; lazy_static! { - pub static ref PROXY_TIMER_HANDLE: Handle = start_global_timer_with_name("proxy-timer"); + pub static ref PROXY_TIMER_HANDLE: Handle = start_global_timer("proxy-timer"); } pub type ArcNotifyWaker = std::sync::Arc; diff --git a/components/tikv_util/src/timer.rs b/components/tikv_util/src/timer.rs index 8b9f7118261..a792975951e 100644 --- a/components/tikv_util/src/timer.rs +++ b/components/tikv_util/src/timer.rs @@ -90,11 +90,11 @@ impl Ord for TimeoutTask { } lazy_static! { - pub static ref GLOBAL_TIMER_HANDLE: Handle = start_global_timer(); + pub static ref GLOBAL_TIMER_HANDLE: Handle = start_global_timer("timer"); } /// Create a global timer with specific thread name. -pub fn start_global_timer_with_name(name: &str) -> Handle { +pub fn start_global_timer(name: &str) -> Handle { let (tx, rx) = mpsc::channel(); let props = crate::thread_group::current_properties(); Builder::new() @@ -112,10 +112,6 @@ pub fn start_global_timer_with_name(name: &str) -> Handle { rx.recv().unwrap() } -fn start_global_timer() -> Handle { - start_global_timer_with_name("timer") -} - /// A struct that marks the *zero* time. /// /// A *zero* time can be any time, as what it represents is `Instant`,