Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Sep 15, 2022
1 parent 16b9415 commit 30f5313
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 62 deletions.
3 changes: 1 addition & 2 deletions components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl TiFlashObserver {
) -> Self {
let engine_store_server_helper =
gen_engine_store_server_helper(engine.engine_store_server_helper);
// TODO(tiflash) start thread pool
// start thread pool for pre handle snapshot
let snap_pool = Builder::new(tikv_util::thd_name!("region-task"))
.max_thread_count(snap_handle_pool_size)
.build_future_pool();
Expand Down Expand Up @@ -242,7 +242,6 @@ impl TiFlashObserver {

impl Coprocessor for TiFlashObserver {
fn stop(&self) {
// TODO(tiflash) remove this when pre apply merged
info!("shutdown tiflash observer"; "peer_id" => self.peer_id);
self.apply_snap_pool.as_ref().unwrap().shutdown();
}
Expand Down
2 changes: 1 addition & 1 deletion components/proxy_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{
collections::{hash_map::RandomState, HashSet},
iter::FromIterator,
path::{Path, PathBuf},
path::Path,
};

use itertools::Itertools;
Expand Down
2 changes: 1 addition & 1 deletion components/proxy_server/src/hacked_lock_mgr.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use tikv::{
server::{lock_manager::waiter_manager::Callback, Error, Result},
server::lock_manager::waiter_manager::Callback,
storage::{
lock_manager::{DiagnosticContext, Lock, LockManager as LockManagerTrait, WaitTimeout},
ProcessResult, StorageCallback,
Expand Down
2 changes: 0 additions & 2 deletions components/proxy_server/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(proc_macro_hygiene)]
use std::{
ffi::CStr,
os::raw::{c_char, c_int},
Expand All @@ -10,7 +9,6 @@ use std::{

use clap::{App, Arg, ArgMatches};
use tikv::config::TiKvConfig;
use tikv_util::config::ReadableDuration;

use crate::{config::make_tikv_config, fatal, setup::overwrite_config_with_cmd_args};

Expand Down
14 changes: 4 additions & 10 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,11 @@ use grpcio::{EnvBuilder, Environment};
use grpcio_health::HealthService;
use kvproto::{
debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst,
kvrpcpb::ApiVersion,
};
use pd_client::{PdClient, RpcClient};
use raft_log_engine::RaftLogEngine;
use raftstore::{
coprocessor::{
config::SplitCheckConfigManager, BoxConsistencyCheckObserver, ConsistencyCheckMethod,
CoprocessorHost, RawConsistencyCheckObserver, RegionInfoAccessor,
},
coprocessor::{config::SplitCheckConfigManager, CoprocessorHost, RegionInfoAccessor},
router::ServerRaftStoreRouter,
store::{
config::RaftstoreConfigManager,
Expand All @@ -75,7 +71,7 @@ use tikv::{
server::{
config::{Config as ServerConfig, ServerConfigManager},
create_raft_storage,
gc_worker::{AutoGcConfig, GcWorker},
gc_worker::GcWorker,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
Expand All @@ -84,8 +80,7 @@ use tikv::{
GRPC_THREAD_PREFIX,
},
storage::{
self, config_manager::StorageConfigManger, mvcc::MvccConsistencyCheckObserver,
txn::flow_controller::FlowController, Engine,
self, config_manager::StorageConfigManger, txn::flow_controller::FlowController, Engine,
},
};
use tikv_util::{
Expand Down Expand Up @@ -217,7 +212,7 @@ pub fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
#[inline]
fn run_impl_only_for_decryption<CER: ConfiguredRaftEngine, F: KvFormat>(
config: TiKvConfig,
proxy_config: ProxyConfig,
_proxy_config: ProxyConfig,
engine_store_server_helper: &EngineStoreServerHelper,
) {
let encryption_key_manager =
Expand Down Expand Up @@ -927,7 +922,6 @@ impl<ER: RaftEngine> TiKvServer<ER> {
data_sink_reg_handle.clone(),
);
self.to_stop.push(single_target_worker);
let rsmeter_pubsub_service = resource_metering::PubSubService::new(data_sink_reg_handle);

let cfg_manager = resource_metering::ConfigManager::new(
self.config.resource_metering.clone(),
Expand Down
10 changes: 2 additions & 8 deletions components/proxy_server/src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
borrow::ToOwned,
io,
path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering},
};

use chrono::Local;
use std::borrow::ToOwned;

use clap::ArgMatches;
use collections::HashMap;
pub use server::setup::initial_logger;
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ where
);
}

let (mut response, mut exec_result) = match cmd_type {
let (mut response, exec_result) = match cmd_type {
AdminCmdType::ChangePeer => self.exec_change_peer(ctx, request),
AdminCmdType::ChangePeerV2 => self.exec_change_peer_v2(ctx, request),
AdminCmdType::Split => self.exec_split(ctx, request),
Expand Down
4 changes: 3 additions & 1 deletion engine_tiflash/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

// Disable warnings for unused engine_rocks's feature.
#![allow(dead_code)]
#![allow(unused_variables)]
use std::{
any::Any,
cell::RefCell,
Expand Down
56 changes: 27 additions & 29 deletions tests/proxy/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use clap::{App, Arg, ArgMatches};
use engine_store_ffi::{KVGetStatus, RaftStoreProxyFFI};
use engine_traits::{
Error, ExternalSstFileInfo, Iterable, Iterator, MiscExt, Mutable, Peekable, Result, SeekKey,
SstExt, SstReader, SstWriter, SstWriterBuilder, WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK,
CF_RAFT, CF_WRITE,
SstExt, WriteBatch, WriteBatchExt, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use kvproto::{
import_sstpb::SstMeta,
Expand Down Expand Up @@ -45,10 +44,7 @@ use proxy_server::{
run::run_tikv_proxy,
};
use raft::eraftpb::MessageType;
use raftstore::{
coprocessor::{ConsistencyCheckMethod, Coprocessor},
store::util::find_peer,
};
use raftstore::{coprocessor::ConsistencyCheckMethod, store::util::find_peer};
use sst_importer::SstImporter;
pub use test_raftstore::{must_get_equal, must_get_none, new_peer};
use test_raftstore::{new_node_cluster, new_tikv_config};
Expand Down Expand Up @@ -154,7 +150,7 @@ mod region {

#[test]
fn test_get_region_local_state() {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);

cluster.run();

Expand All @@ -169,7 +165,7 @@ mod region {
iter_ffi_helpers(
&cluster,
None,
&mut |id: u64, _, ffi_set: &mut FFIHelperSet| {
&mut |_id: u64, _, ffi_set: &mut FFIHelperSet| {
let f = ffi_set.proxy_helper.fn_get_region_local_state.unwrap();
let mut state = kvproto::raft_serverpb::RegionLocalState::default();
let mut error_msg = mock_engine_store::RawCppStringPtrGuard::default();
Expand Down Expand Up @@ -374,7 +370,7 @@ mod write {
fn test_interaction() {
// TODO Maybe we should pick this test to TiKV.
// This test is to check if empty entries can affect pre_exec and post_exec.
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);

fail::cfg("try_flush_data", "return(0)").unwrap();
let _ = cluster.run();
Expand Down Expand Up @@ -414,6 +410,9 @@ mod write {
}
std::thread::sleep(std::time::Duration::from_millis(100));
retry += 1;
if retry >= 30 {
panic!("states is not changed")
}
};

for i in prev_states.keys() {
Expand Down Expand Up @@ -461,7 +460,7 @@ mod write {

fn test_leadership_change_impl(filter: bool) {
// Test if a empty command can be observed when leadership changes.
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);

disable_auto_gen_compact_log(&mut cluster);

Expand Down Expand Up @@ -538,7 +537,7 @@ mod write {

#[test]
fn test_kv_write_always_persist() {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);

let _ = cluster.run();

Expand Down Expand Up @@ -576,7 +575,7 @@ mod write {

#[test]
fn test_kv_write() {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);

fail::cfg("on_post_exec_normal", "return(false)").unwrap();
fail::cfg("on_post_exec_admin", "return(false)").unwrap();
Expand Down Expand Up @@ -709,7 +708,7 @@ mod write {
#[test]
fn test_consistency_check() {
// ComputeHash and VerifyHash shall be filtered.
let (mut cluster, pd_client) = new_mock_cluster(0, 2);
let (mut cluster, _pd_client) = new_mock_cluster(0, 2);

cluster.run();

Expand Down Expand Up @@ -738,7 +737,7 @@ mod write {
// If we just return None for CompactLog, the region state in ApplyFsm will change.
// Because there is no rollback in new implementation.
// This is a ERROR state.
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);
cluster.run();

// We don't return Persist after handling CompactLog.
Expand All @@ -762,7 +761,7 @@ mod write {
let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term);
let req =
test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log);
let res = cluster
let _ = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();

Expand All @@ -789,7 +788,7 @@ mod write {

#[test]
fn test_compact_log() {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);

disable_auto_gen_compact_log(&mut cluster);

Expand Down Expand Up @@ -891,7 +890,7 @@ mod write {
#[test]
fn test_empty_cmd() {
// Test if a empty command can be observed when leadership changes.
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);
// Disable compact log
cluster.cfg.raft_store.raft_log_gc_count_limit = Some(1000);
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10000);
Expand Down Expand Up @@ -1004,14 +1003,14 @@ mod ingest {
// copy file to save dir.
let src = sst_path.clone();
let dst = file.get_import_path().save.to_str().unwrap();
std::fs::copy(src.clone(), dst);
let _ = std::fs::copy(src.clone(), dst);

(file.get_import_path().save.clone(), meta, sst_path)
}

#[test]
fn test_handle_ingest_sst() {
let (mut cluster, pd_client) = new_mock_cluster(0, 1);
let (mut cluster, _pd_client) = new_mock_cluster(0, 1);
let _ = cluster.run();

let key = "k";
Expand Down Expand Up @@ -1044,7 +1043,7 @@ mod ingest {

#[test]
fn test_invalid_ingest_sst() {
let (mut cluster, pd_client) = new_mock_cluster(0, 1);
let (mut cluster, _pd_client) = new_mock_cluster(0, 1);

let _ = cluster.run();

Expand Down Expand Up @@ -1080,7 +1079,7 @@ mod ingest {

#[test]
fn test_ingest_return_none() {
let (mut cluster, pd_client) = new_mock_cluster(0, 1);
let (mut cluster, _pd_client) = new_mock_cluster(0, 1);

disable_auto_gen_compact_log(&mut cluster);

Expand Down Expand Up @@ -1109,7 +1108,7 @@ mod ingest {
let req = new_ingest_sst_cmd(meta1);
let _ = cluster.request(b"k1", vec![req], false, Duration::from_secs(5), true);

let (file5, meta5, sst_path5) = make_sst(
let (file5, meta5, _sst_path5) = make_sst(
&cluster,
region5.get_id(),
region5.get_region_epoch().clone(),
Expand Down Expand Up @@ -1283,7 +1282,7 @@ mod restart {
#[test]
fn test_kv_restart() {
// Test if a empty command can be observed when leadership changes.
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
let (mut cluster, _pd_client) = new_mock_cluster(0, 3);

// Disable AUTO generated compact log.
disable_auto_gen_compact_log(&mut cluster);
Expand All @@ -1306,7 +1305,7 @@ mod restart {
let req =
test_raftstore::new_admin_request(region_id, region.get_region_epoch(), compact_log);
fail::cfg("try_flush_data", "return(1)").unwrap();
let res = cluster
let _ = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();

Expand Down Expand Up @@ -1524,13 +1523,13 @@ mod snapshot {
.wl()
.add_recv_filter(3, Box::new(CollectSnapshotFilter::new(tx)));
pd_client.must_add_peer(r1, new_peer(3, 3));
let region = cluster.get_region(b"k1");
// Ensure the snapshot of range ("", "") is sent and piled in filter.
if let Err(e) = rx.recv_timeout(Duration::from_secs(1)) {
panic!("the snapshot is not sent before split, e: {:?}", e);
}

// Occasionally fails.
// let region1 = cluster.get_region(b"k1");
// // Split the region range and then there should be another snapshot for the split ranges.
// cluster.must_split(&region, b"k2");
// check_key(&cluster, b"k3", b"v3", None, Some(true), Some(vec![3]));
Expand All @@ -1551,7 +1550,7 @@ mod snapshot {
// Disable default max peer count check.
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
let _ = cluster.run_conf_change();
for i in 0..count {
let k = format!("k{:0>4}", 2 * i + 1);
let v = format!("v{}", 2 * i + 1);
Expand All @@ -1565,7 +1564,6 @@ mod snapshot {
let region = cluster.get_region(k.as_bytes());
let sp = format!("k{:0>4}", 2 * i + 2);
cluster.must_split(&region, sp.as_bytes());
let region = cluster.get_region(k.as_bytes());
}

(cluster, pd_client)
Expand Down Expand Up @@ -1673,7 +1671,7 @@ mod snapshot {
});

pd_client.must_merge(r1_new.get_id(), r3_new.get_id());
let r1_new2 = cluster.get_region(b"k1");
let _r1_new2 = cluster.get_region(b"k1");
let r3_new2 = cluster.get_region(b"k3");

iter_ffi_helpers(&cluster, None, &mut |id: u64, _, ffi: &mut FFIHelperSet| {
Expand Down Expand Up @@ -1718,7 +1716,7 @@ mod snapshot {
// Disable default max peer count check.
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
let _ = cluster.run_conf_change();
cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");

Expand Down
Loading

0 comments on commit 30f5313

Please sign in to comment.