Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple apply snapshot by introducing pre(post)_apply_snapshot #146

Merged
merged 16 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/pr-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ jobs:
cargo test --features compat_old_proxy --package tests --test failpoints cases::test_import_service
cargo test --features compat_old_proxy --package tests --test failpoints cases::test_proxy_replica_read
# tests based on new-mock-engine-store, with compat for new proxy
cargo test --package tests --test proxy normal
cargo test --package tests --test proxy normal::store
cargo test --package tests --test proxy normal::region
cargo test --package tests --test proxy normal::config
cargo test --package tests --test proxy normal::write
cargo test --package tests --test proxy normal::ingest
cargo test --package tests --test proxy normal::snapshot
cargo test --package tests --test proxy normal::restart
# tests based on new-mock-engine-store, for some tests not available for new proxy
cargo test --package tests --test proxy proxy
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion components/proxy_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ async-stream = "0.2"
backup = { path = "../backup", default-features = false }
backup-stream = { path = "../backup-stream", default-features = false }
causal_ts = { path = "../causal_ts" }
cdc = { path = "../cdc", default-features = false }
chrono = "0.4"
clap = "2.32"
collections = { path = "../collections" }
Expand Down
2 changes: 1 addition & 1 deletion components/proxy_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use online_config::OnlineConfig;
use serde_derive::{Deserialize, Serialize};
use serde_with::with_prefix;
use tikv::config::TiKvConfig;
use tikv_util::crit;
use tikv_util::{config::ReadableDuration, crit};

use crate::fatal;

Expand Down
6 changes: 5 additions & 1 deletion components/proxy_server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{

use clap::{App, Arg, ArgMatches};
use tikv::config::TiKvConfig;
use tikv_util::config::ReadableDuration;

use crate::{
fatal,
Expand All @@ -32,8 +33,12 @@ pub fn setup_default_tikv_config(default: &mut TiKvConfig) {
default.server.addr = TIFLASH_DEFAULT_LISTENING_ADDR.to_string();
default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
default.raft_store.region_worker_tick_interval = 500;
let clean_stale_tick_max = (10_000 / default.raft_store.region_worker_tick_interval) as usize;
default.raft_store.clean_stale_tick_max = clean_stale_tick_max;
}

/// Generate default TiKvConfig, but with some Proxy's default values.
pub fn gen_tikv_config(
matches: &ArgMatches,
is_config_check: bool,
Expand Down Expand Up @@ -296,7 +301,6 @@ pub unsafe fn run_proxy(
overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches);
config.logger_compatible_adjust();

// TODO(tiflash) We should later use ProxyConfig for proxy's own settings like `snap_handle_pool_size`
if is_config_check {
validate_and_persist_config(&mut config, false);
match crate::config::ensure_no_common_unrecognized_keys(
Expand Down
12 changes: 6 additions & 6 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::{
};

use api_version::{dispatch_api_version, KvFormat};
use backup_stream::{config::BackupStreamConfigManager, observer::BackupStreamObserver};
use cdc::{CdcConfigManager, MemoryQuota};
use concurrency_manager::ConcurrencyManager;
use encryption_export::{data_key_manager_from_config, DataKeyManager};
use engine_rocks::{
Expand All @@ -40,9 +38,8 @@ use futures::executor::block_on;
use grpcio::{EnvBuilder, Environment};
use grpcio_health::HealthService;
use kvproto::{
brpb::create_backup, cdcpb::create_change_data, deadlock::create_deadlock,
debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst,
kvrpcpb::ApiVersion, resource_usage_agent::create_resource_metering_pub_sub,
kvrpcpb::ApiVersion,
};
use pd_client::{PdClient, RpcClient};
use raft_log_engine::RaftLogEngine;
Expand Down Expand Up @@ -421,8 +418,11 @@ impl<CER: ConfiguredRaftEngine> TiKvServer<CER> {
});
// engine_tiflash::RocksEngine has engine_rocks::RocksEngine inside
let mut kv_engine = TiFlashEngine::from_rocks(kv_engine);
// TODO(tiflash) setup proxy_config
kv_engine.init(engine_store_server_helper, 2, Some(ffi_hub));
kv_engine.init(
engine_store_server_helper,
self.proxy_config.raft_store.snap_handle_pool_size,
Some(ffi_hub),
);

let engines = Engines::new(kv_engine, raft_engine);

Expand Down
99 changes: 98 additions & 1 deletion components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,51 @@ impl<E: KvEngine> CoprocessorHost<E> {
);
}

pub fn should_pre_apply_snapshot(&self) -> bool {
for observer in &self.registry.apply_snapshot_observers {
let observer = observer.observer.inner();
if observer.should_pre_apply_snapshot() {
return true;
}
}
false
}

pub fn pre_apply_snapshot(
&self,
region: &Region,
peer_id: u64,
snap_key: &crate::store::SnapKey,
snap: Option<&crate::store::Snapshot>,
) {
loop_ob!(
region,
&self.registry.apply_snapshot_observers,
pre_apply_snapshot,
peer_id,
snap_key,
snap,
);
}

pub fn post_apply_snapshot(
&self,
region: &Region,
peer_id: u64,
snap_key: &crate::store::SnapKey,
snap: Option<&crate::store::Snapshot>,
) -> Result<()> {
let mut ctx = ObserverContext::new(region);
for observer in &self.registry.apply_snapshot_observers {
let observer = observer.observer.inner();
let res = observer.post_apply_snapshot(&mut ctx, peer_id, snap_key, snap);
if res.is_err() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to cp from tikv

return res;
}
}
Ok(())
}

pub fn new_split_checker_host<'a>(
&'a self,
region: &Region,
Expand Down Expand Up @@ -646,7 +691,10 @@ mod tests {
};
use tikv_util::box_err;

use crate::coprocessor::*;
use crate::{
coprocessor::*,
store::{SnapKey, Snapshot},
};

#[derive(Clone, Default)]
struct TestCoprocessor {
Expand All @@ -673,6 +721,9 @@ mod tests {
PostExecQuery = 17,
PostExecAdmin = 18,
OnComputeEngineSize = 19,
PreApplySnapshot = 20,
PostApplySnapshot = 21,
ShouldPreApplySnapshot = 22,
}

impl Coprocessor for TestCoprocessor {}
Expand Down Expand Up @@ -840,6 +891,39 @@ mod tests {
.fetch_add(ObserverIndex::ApplySst as usize, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
}

fn pre_apply_snapshot(
&self,
ctx: &mut ObserverContext<'_>,
_: u64,
_: &SnapKey,
_: Option<&Snapshot>,
) {
self.called
.fetch_add(ObserverIndex::PreApplySnapshot as usize, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
}

fn post_apply_snapshot(
&self,
ctx: &mut ObserverContext<'_>,
_: u64,
_: &crate::store::SnapKey,
_: Option<&Snapshot>,
) -> Result<()> {
self.called
.fetch_add(ObserverIndex::PostApplySnapshot as usize, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
Ok(())
}

fn should_pre_apply_snapshot(&self) -> bool {
self.called.fetch_add(
ObserverIndex::ShouldPreApplySnapshot as usize,
Ordering::SeqCst,
);
false
}
}

impl CmdObserver<PanicEngine> for TestCoprocessor {
Expand Down Expand Up @@ -984,6 +1068,19 @@ mod tests {
host.post_exec(&region, &cmd, &apply_state, &region_state, &mut info);
index += ObserverIndex::PostExecQuery as usize;
assert_all!([&ob.called], &[index]);

let key = SnapKey::new(region.get_id(), 1, 1);
host.pre_apply_snapshot(&region, 0, &key, None);
index += ObserverIndex::PreApplySnapshot as usize;
assert_all!([&ob.called], &[index]);

let _ = host.post_apply_snapshot(&region, 0, &key, None);
index += ObserverIndex::PostApplySnapshot as usize;
assert_all!([&ob.called], &[index]);

host.should_pre_apply_snapshot();
index += ObserverIndex::ShouldPreApplySnapshot as usize;
assert_all!([&ob.called], &[index]);
}

#[test]
Expand Down
32 changes: 31 additions & 1 deletion components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use raft::{eraftpb, StateRole};
pub mod config;
mod consistency_check;
pub mod dispatcher;
mod error;
pub mod error;
mod metrics;
pub mod region_info_accessor;
mod split_check;
Expand Down Expand Up @@ -176,6 +176,36 @@ pub trait ApplySnapshotObserver: Coprocessor {
/// Hook to call after applying sst file. Currently the content of the snapshot can't be
/// passed to the observer.
fn apply_sst(&self, _: &mut ObserverContext<'_>, _: CfName, _path: &str) {}

/// Hook when receiving Task::Apply.
/// Should pass valid snapshot, the option is only for testing.
/// Notice that we can call `pre_apply_snapshot` to multiple snapshots at
/// the same time.
fn pre_apply_snapshot(
&self,
_: &mut ObserverContext<'_>,
_peer_id: u64,
_: &crate::store::SnapKey,
_: Option<&crate::store::Snapshot>,
) {
}

/// Hook when the whole snapshot is applied.
/// Should pass valid snapshot, the option is only for testing.
fn post_apply_snapshot(
&self,
_: &mut ObserverContext<'_>,
_: u64,
_: &crate::store::SnapKey,
_snapshot: Option<&crate::store::Snapshot>,
) -> Result<()> {
Ok(())
}

/// We call pre_apply_snapshot only when one of the observer returns true.
fn should_pre_apply_snapshot(&self) -> bool {
false
}
}

/// SplitChecker is invoked during a split check scan, and decides to use
Expand Down
Loading