Skip to content

Commit

Permalink
raftstore: Implement coprocessor observer pre_persist (tikv#12957)
Browse files Browse the repository at this point in the history
ref tikv#12849

Support coprocessor observer pre_commit

Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Sep 15, 2022
1 parent 30f5313 commit 0e3342a
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 87 deletions.
1 change: 1 addition & 0 deletions ci_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ elif [[ $M == "testnew" ]]; then
cargo test --package tests --test proxy normal::ingest
cargo test --package tests --test proxy normal::snapshot
cargo test --package tests --test proxy normal::restart
cargo test --package tests --test proxy normal::persist
# tests based on new-mock-engine-store, for some tests not available for new proxy
cargo test --package tests --test proxy proxy
elif [[ $M == "debug" ]]; then
Expand Down
47 changes: 40 additions & 7 deletions components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@ use std::{

use collections::HashMap;
use engine_tiflash::FsStatsExt;
use engine_traits::{CfName, SstMetaInfo};
use engine_traits::SstMetaInfo;
use kvproto::{
import_sstpb::SstMeta,
metapb::Region,
raft_cmdpb::{
AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest,
RaftCmdRequest, RaftCmdResponse, Request,
},
raft_cmdpb::{AdminCmdType, AdminRequest, AdminResponse, CmdType, RaftCmdRequest},
raft_serverpb::RaftApplyState,
};
use raft::{eraftpb, StateRole};
use raft::StateRole;
use raftstore::{
coprocessor,
coprocessor::{
Expand Down Expand Up @@ -583,6 +579,43 @@ impl RegionChangeObserver for TiFlashObserver {
.handle_destroy(ob_ctx.region().get_id());
}
}
fn pre_persist(
&self,
ob_ctx: &mut ObserverContext<'_>,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let should_persist = if is_finished {
fail::fail_point!("on_pre_persist_with_finish", |_| { true });
false
} else {
let cmd = cmd.unwrap();
if cmd.has_admin_request() {
match cmd.get_admin_request().get_cmd_type() {
// Merge needs to get the latest apply index.
AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => true,
_ => false,
}
} else {
false
}
};
if should_persist {
info!(
"observe pre_persist, persist";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
);
} else {
debug!(
"observe pre_persist";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
"is_finished" => is_finished,
);
};
should_persist
}
}

impl PdTaskObserver for TiFlashObserver {
Expand Down
20 changes: 20 additions & 0 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,26 @@ impl<E: KvEngine> CoprocessorHost<E> {
);
}

/// `pre_persist` is called we we want to persist data or meta for a region.
/// For example, in `finish_for` and `commit`,
/// we will separately call `pre_persist` with is_finished = true/false.
/// By returning false, we reject this persistence.
pub fn pre_persist(
&self,
region: &Region,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let mut ctx = ObserverContext::new(region);
for observer in &self.registry.region_change_observers {
let observer = observer.observer.inner();
if !observer.pre_persist(&mut ctx, is_finished, cmd) {
return false;
}
}
true
}

pub fn on_flush_applied_cmd_batch(
&self,
max_level: ObserveLevel,
Expand Down
11 changes: 11 additions & 0 deletions components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ pub enum RegionChangeEvent {
pub trait RegionChangeObserver: Coprocessor {
/// Hook to call when a region changed on this TiKV
fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {}

/// Should be called everytime before we write a WriteBatch into
/// KvEngine. Returns false if we can't commit at this time.
fn pre_persist(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
true
}
}

#[derive(Clone, Debug, Default)]
Expand Down
68 changes: 58 additions & 10 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,18 @@ where
delegate: &mut ApplyDelegate<EK>,
results: VecDeque<ExecResult<EK::Snapshot>>,
) {
#[cfg(any(test, feature = "testexport"))]
{
if cfg!(feature = "compat_old_proxy") {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
if self.host.pre_persist(&delegate.region, true, None) {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, false);
} else {
debug!("do not persist when finish_for";
"region" => ?delegate.region,
"tag" => &delegate.tag,
"apply_state" => ?delegate.apply_state,
);
}
self.commit_opt(delegate, false);
self.apply_res.push(ApplyRes {
region_id: delegate.region_id(),
apply_state: delegate.apply_state.clone(),
Expand Down Expand Up @@ -1096,8 +1099,15 @@ where
return ApplyResult::Yield;
}
}
if should_flush_to_engine(&cmd) {
apply_ctx.commit_opt(self, true);
let mut has_unflushed_data =
self.last_flush_applied_index != self.apply_state.get_applied_index();
if (has_unflushed_data && should_write_to_engine(&cmd)
|| apply_ctx.kv_wb().should_write_to_engine())
&& apply_ctx.host.pre_persist(&self.region, false, Some(&cmd))
{
// TODO(tiflash) may write apply state twice here.
// Originally use only `commit_opt`.
apply_ctx.commit(self);
if let Some(start) = self.handle_start.as_ref() {
if start.saturating_elapsed() >= apply_ctx.yield_duration {
return ApplyResult::Yield;
Expand Down Expand Up @@ -4983,6 +4993,7 @@ mod tests {
cmd_sink: Option<Arc<Mutex<Sender<CmdBatch>>>>,
filter_compact_log: Arc<AtomicBool>,
filter_consistency_check: Arc<AtomicBool>,
skip_persist_when_pre_commit: Arc<AtomicBool>,
delay_remove_ssts: Arc<AtomicBool>,
last_delete_sst_count: Arc<AtomicU64>,
last_pending_delete_sst_count: Arc<AtomicU64>,
Expand Down Expand Up @@ -5106,6 +5117,17 @@ mod tests {
fn on_applied_current_term(&self, _: raft::StateRole, _: &Region) {}
}

impl RegionChangeObserver for ApplyObserver {
fn pre_persist(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
!self.skip_persist_when_pre_commit.load(Ordering::SeqCst)
}
}

#[test]
fn test_handle_raft_committed_entries() {
let (_path, engine) = create_tmp_engine("test-delegate");
Expand Down Expand Up @@ -5725,6 +5747,8 @@ mod tests {
let obs = ApplyObserver::default();
host.registry
.register_admin_observer(1, BoxAdminObserver::new(obs.clone()));
host.registry
.register_region_change_observer(1, BoxRegionChangeObserver::new(obs.clone()));
host.registry
.register_query_observer(1, BoxQueryObserver::new(obs.clone()));

Expand Down Expand Up @@ -5760,6 +5784,8 @@ mod tests {
reg.region.mut_region_epoch().set_version(3);
router.schedule_task(1, Msg::Registration(reg));

obs.skip_persist_when_pre_commit
.store(true, Ordering::SeqCst);
let mut index_id = 1;
let put_entry = EntryBuilder::new(index_id, 1)
.put(b"k1", b"v1")
Expand All @@ -5768,7 +5794,19 @@ mod tests {
.epoch(1, 3)
.build();
router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![put_entry], vec![])));
fetch_apply_res(&rx);
let apply_res = fetch_apply_res(&rx);

// We don't persist at `finish_for`, since we disabled `pre_persist`.
let state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(1))
.unwrap()
.unwrap_or_default();
assert_eq!(
apply_res.apply_state.get_applied_index(),
state.get_applied_index() + 1
);
obs.skip_persist_when_pre_commit
.store(false, Ordering::SeqCst);

// Phase 1: we test if pre_exec will filter execution of commands correctly.
index_id += 1;
Expand All @@ -5790,6 +5828,16 @@ mod tests {
assert_eq!(apply_res.exec_res.len(), 0);
assert_eq!(apply_res.apply_state.get_truncated_state().get_index(), 0);

// We persist at `finish_for`, since we enabled `pre_persist`.
let state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(1))
.unwrap()
.unwrap_or_default();
assert_eq!(
apply_res.apply_state.get_applied_index(),
state.get_applied_index()
);

index_id += 1;
// Don't filter CompactLog
obs.filter_compact_log.store(false, Ordering::SeqCst);
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl Simulator for NodeCluster {
.max_total_size(cfg.server.snap_max_total_size.0)
.encryption_key_manager(key_manager)
.max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0)
.enable_multi_snapshot_files(true)
.build(tmp.path().to_str().unwrap());
(snap_mgr, Some(tmp))
} else {
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ impl ServerCluster {
.max_total_size(cfg.server.snap_max_total_size.0)
.encryption_key_manager(key_manager)
.max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0)
.enable_multi_snapshot_files(true)
.build(tmp_str);
self.snap_mgrs.insert(node_id, snap_mgr.clone());
let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone()));
Expand Down
7 changes: 2 additions & 5 deletions new-mock-engine-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
time::Duration,
};

use engine_rocks::RocksEngine;
pub use engine_store_ffi::{
interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper,
RawCppPtr, UnwrapExternCFunc,
Expand All @@ -18,10 +17,8 @@ use engine_traits::{
Engines, Iterable, Peekable, SyncMutable, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use kvproto::{
raft_cmdpb::{AdminCmdType, AdminRequest},
raft_serverpb::{
MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState,
},
raft_cmdpb::AdminCmdType,
raft_serverpb::{RaftApplyState, RegionLocalState},
};
pub use mock_cluster::{Cluster, ProxyConfig, Simulator, TestPdClient, TiFlashEngine};
use protobuf::Message;
Expand Down
19 changes: 5 additions & 14 deletions new-mock-engine-store/src/mock_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
#![feature(slice_take)]

use std::{
borrow::BorrowMut,
cell::RefCell,
collections::{hash_map::Entry as MapEntry, BTreeMap},
path::Path,
pin::Pin,
collections::hash_map::Entry as MapEntry,
result,
sync::{atomic::AtomicU8, Arc, Mutex, RwLock},
thread,
Expand All @@ -22,19 +18,17 @@ pub use engine_store_ffi::{
interfaces::root::DB as ffi_interfaces, EngineStoreServerHelper, RaftStoreProxyFFIHelper,
RawCppPtr, TiFlashEngine, UnwrapExternCFunc,
};
use engine_traits::{Engines, KvEngine, SyncMutable, CF_DEFAULT, CF_LOCK, CF_WRITE};
use engine_traits::{Engines, KvEngine, CF_DEFAULT};
use file_system::IORateLimiter;
use futures::executor::block_on;
use kvproto::{
errorpb::Error as PbError,
metapb::{self, Buckets, PeerRole, RegionEpoch, StoreLabel},
metapb::{self, PeerRole, RegionEpoch, StoreLabel},
raft_cmdpb::{RaftCmdRequest, RaftCmdResponse, Request, *},
raft_serverpb::RaftMessage,
};
use lazy_static::lazy_static;
use pd_client::PdClient;
pub use proxy_server::config::ProxyConfig;
use proxy_server::fatal;
use raftstore::{
store::{
bootstrap_store,
Expand All @@ -58,10 +52,7 @@ pub use test_raftstore::{
new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config,
new_transfer_leader_cmd, sleep_ms, TestPdClient,
};
use tikv::{
config::TiKvConfig,
server::{Node, Result as ServerResult},
};
use tikv::{config::TiKvConfig, server::Result as ServerResult};
use tikv_util::{
crit, debug, error, info, safe_panic,
sys::SysQuota,
Expand Down Expand Up @@ -259,7 +250,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
key_manager: &Option<Arc<DataKeyManager>>,
router: &Option<RaftRouter<TiFlashEngine, engine_rocks::RocksEngine>>,
) {
let (mut ffi_helper_set, node_cfg) =
let (mut ffi_helper_set, _node_cfg) =
self.make_ffi_helper_set(0, engines, key_manager, router);

// We can not use moved or cloned engines any more.
Expand Down
Loading

0 comments on commit 0e3342a

Please sign in to comment.