Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support post_exec on Raftstore Proxy #140

Merged
merged 60 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
0b7bf7f
f
CalvinNeo Jul 4, 2022
fea5a4a
fix
CalvinNeo Jul 4, 2022
67c47da
add engine_tiflash with this version of engine_rocks
CalvinNeo Jul 5, 2022
c96c0c3
fmt
CalvinNeo Jul 5, 2022
a3d6722
Merge remote-tracking branch 'upstream/raftstore-proxy-6.2' into try-…
CalvinNeo Jul 5, 2022
e3a716b
add engine_tiflash actually
CalvinNeo Jul 5, 2022
9bfec36
fix
CalvinNeo Jul 6, 2022
5afb804
fix some tests
CalvinNeo Jul 6, 2022
0f38d9d
f
CalvinNeo Jul 6, 2022
ed1708b
it runs if we do_write everytime
CalvinNeo Jul 6, 2022
8ab1acb
add ensure_no_common_unrecognized_keys to config checker
CalvinNeo Jul 7, 2022
5237c17
reorg tests
CalvinNeo Jul 8, 2022
4d4c583
raftstore: Implement coprocessor observer on_empty_cmd (#12851)
CalvinNeo Jun 22, 2022
b147018
fix tests
CalvinNeo Jul 8, 2022
3d50bcc
move proxy-related code in component/server into proxy_server
CalvinNeo Jul 11, 2022
8833251
Merge remote-tracking branch 'upstream/raftstore-proxy-6.2' into try-…
CalvinNeo Jul 11, 2022
f0ad93d
move codes in component/server/src/setup.rs into proxy_server
CalvinNeo Jul 11, 2022
453eb87
support new ffis in mock-engine-store and new-mock-engine-store
CalvinNeo Jul 11, 2022
573efc6
add disclaimer
CalvinNeo Jul 11, 2022
af8b448
enlength some test time
CalvinNeo Jul 12, 2022
cb5bc33
Remove pub fields of PdCluster, add feature to origin import
CalvinNeo Jul 14, 2022
e8e1851
raftstore: Implement coprocessor observer pre_exec_admin(query) (#12868)
CalvinNeo Jul 13, 2022
3dae81c
add
CalvinNeo Jul 14, 2022
731c57d
pf
CalvinNeo Jul 14, 2022
39c1db6
Merge remote-tracking branch 'upstream/raftstore-proxy-6.2' into migr…
CalvinNeo Jul 15, 2022
c301334
remove logs
CalvinNeo Jul 15, 2022
f1bba4e
remove hack: computehash/verifyhash/transferhash
CalvinNeo Jul 18, 2022
b688e40
raftstore: Implement coprocessor observer post_exec_admin(query) (#12…
CalvinNeo Jul 18, 2022
e83a37d
remove rollback of CompactLog
CalvinNeo Jul 18, 2022
a41861e
Merge branch 'migrate-pre-exec' into migrate-post-exec
CalvinNeo Jul 18, 2022
3f046f6
support post_exec
CalvinNeo Jul 18, 2022
b727d5e
raftstore: pub `check_sst_for_ingestion` (#13040)
CalvinNeo Jul 18, 2022
661767b
remove self-defined check_sst_for_ingestion
CalvinNeo Jul 18, 2022
7a78bd4
fix tests
CalvinNeo Jul 19, 2022
4eb4ef4
fix some tests and mock
CalvinNeo Jul 19, 2022
f93498e
Merge branch 'migrate-pre-exec' into migrate-post-exec
CalvinNeo Jul 19, 2022
f43c37d
add ingest tests
CalvinNeo Jul 19, 2022
951bcd1
Merge remote-tracking branch 'upstream/raftstore-proxy-6.2' into migr…
CalvinNeo Aug 9, 2022
3faedb0
remove hack, add back test
CalvinNeo Aug 9, 2022
b7fe8b7
f
CalvinNeo Aug 9, 2022
de8aa46
f
CalvinNeo Aug 9, 2022
b694fc7
fix old tests by default enable new proxy
CalvinNeo Aug 10, 2022
af80e62
raise log level
CalvinNeo Aug 11, 2022
34b3fe7
disable test_early_apply_yield_followed_with_many_entries
CalvinNeo Aug 11, 2022
e44a3d1
fix tests by enabling persist on old
CalvinNeo Aug 11, 2022
1f6bc02
fmt
CalvinNeo Aug 11, 2022
9adabd8
add back ingest sst with compat_old_proxy, to fix some bugs
CalvinNeo Aug 12, 2022
b71dcc9
fix
CalvinNeo Aug 12, 2022
8b72794
add comment
CalvinNeo Aug 15, 2022
e529489
f
CalvinNeo Aug 15, 2022
608dea5
Merge branch 'raftstore-proxy-6.2' into migrate-post-exec
CalvinNeo Aug 15, 2022
9d86bd1
raftstore: allow exec observers delay deletion of applied ssts (#13061)
CalvinNeo Aug 16, 2022
3b0569e
support pending delete ssts
CalvinNeo Aug 16, 2022
2e65d9b
Merge branch 'migrate-post-exec' of github.com:CalvinNeo/tidb-engine-…
CalvinNeo Aug 16, 2022
f34b8f8
f
CalvinNeo Aug 16, 2022
d56fa5d
add back test_empty_cmd
CalvinNeo Aug 17, 2022
5afbb35
Merge branch 'raftstore-proxy-6.2' into migrate-post-exec
CalvinNeo Aug 17, 2022
b2958e8
Merge branch 'migrate-post-exec' of github.com:CalvinNeo/tidb-engine-…
CalvinNeo Aug 18, 2022
3bae7d3
f
CalvinNeo Aug 18, 2022
c6bcbca
f
CalvinNeo Aug 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/pr-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,7 @@ jobs:
cargo test --package tests --test failpoints cases::test_merge
cargo test --package tests --test failpoints cases::test_import_service
cargo test --package tests --test failpoints cases::test_proxy_replica_read
# tests based on new-mock-engine-store, with compaction for new proxy
cargo test --features compat_new_proxy --package tests --test proxy normal
# tests based on new-mock-engine-store, for some tests not available for new proxy
cargo test --package tests --test proxy proxy
1 change: 1 addition & 0 deletions components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ impl Delegate {
for cmd in batch.into_iter(self.region_id) {
let Cmd {
index,
term: _,
mut request,
mut response,
} = cmd;
Expand Down
39 changes: 35 additions & 4 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,38 @@ impl<E: KvEngine> CoprocessorHost<E> {
let admin = cmd.get_admin_request();
for observer in &self.registry.admin_observers {
let observer = observer.observer.inner();
if observer.pre_exec_admin(&mut ctx, admin, index, term) {
if observer.pre_exec_admin(&mut ctx, admin) {
return true;
}
}
false
}
}

/// `post_exec` should be called immediately after we executed one raft command.
/// It notifies observers side effects of this command before execution of the next command,
/// including req/resp, apply state, modified region state, etc.
/// Return true observers think a persistence is necessary.
pub fn post_exec(
&self,
region: &Region,
cmd: &Cmd,
apply_state: &RaftApplyState,
region_state: &RegionState,
) -> bool {
let mut ctx = ObserverContext::new(region);
if !cmd.response.has_admin_response() {
for observer in &self.registry.query_observers {
let observer = observer.observer.inner();
if observer.post_exec_query(&mut ctx, cmd, apply_state, region_state) {
return true;
}
}
false
} else {
for observer in &self.registry.admin_observers {
let observer = observer.observer.inner();
if observer.post_exec_admin(&mut ctx, cmd, apply_state, region_state) {
return true;
}
}
Expand Down Expand Up @@ -801,7 +832,7 @@ mod tests {
assert_all!([&ob.called], &[3]);
let mut admin_resp = RaftCmdResponse::default();
admin_resp.set_admin_response(AdminResponse::default());
host.post_apply(&region, &Cmd::new(0, admin_req, admin_resp));
host.post_apply(&region, &Cmd::new(0, 0, admin_req, admin_resp));
assert_all!([&ob.called], &[6]);

let mut query_req = RaftCmdRequest::default();
Expand All @@ -811,7 +842,7 @@ mod tests {
host.pre_apply(&region, &query_req);
assert_all!([&ob.called], &[15]);
let query_resp = RaftCmdResponse::default();
host.post_apply(&region, &Cmd::new(0, query_req, query_resp));
host.post_apply(&region, &Cmd::new(0, 0, query_req, query_resp));
assert_all!([&ob.called], &[21]);

host.on_role_change(&region, RoleChange::new(StateRole::Leader));
Expand Down Expand Up @@ -893,7 +924,7 @@ mod tests {
host.pre_apply(&region, &req);
assert_all!([&ob1.called, &ob2.called], &[0, base_score * 2 + 3]);

host.post_apply(&region, &Cmd::new(0, req.clone(), resp.clone()));
host.post_apply(&region, &Cmd::new(0, 0, req.clone(), resp.clone()));
assert_all!([&ob1.called, &ob2.called], &[0, base_score * 3 + 6]);

set_all!(&[&ob2.bypass], false);
Expand Down
35 changes: 34 additions & 1 deletion components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use kvproto::{
metapb::Region,
pdpb::CheckPolicy,
raft_cmdpb::{AdminRequest, AdminResponse, RaftCmdRequest, RaftCmdResponse, Request},
raft_serverpb::RaftApplyState,
};
use raft::{eraftpb, StateRole};

Expand Down Expand Up @@ -74,6 +75,12 @@ impl<'a> ObserverContext<'a> {
}
}

pub struct RegionState {
pub peer_id: u64,
pub pending_remove: bool,
pub modified_region: Option<Region>,
}

pub trait AdminObserver: Coprocessor {
/// Hook to call before proposing admin request.
fn pre_propose_admin(&self, _: &mut ObserverContext<'_>, _: &mut AdminRequest) -> Result<()> {
Expand All @@ -87,6 +94,18 @@ pub trait AdminObserver: Coprocessor {
/// For now, the `region` in `ObserverContext` is an empty region.
fn post_apply_admin(&self, _: &mut ObserverContext<'_>, _: &AdminResponse) {}

/// Hook to call immediately after exec command
/// Will be a special persistence after this exec if a observer returns true.
fn post_exec_admin(
&self,
_: &mut ObserverContext<'_>,
_: &Cmd,
_: &RaftApplyState,
_: &RegionState,
) -> bool {
false
}

/// Hook before exec admin request, returns whether we should skip this
/// admin.
fn pre_exec_admin(
Expand Down Expand Up @@ -118,6 +137,18 @@ pub trait QueryObserver: Coprocessor {
/// For now, the `region` in `ObserverContext` is an empty region.
fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &Cmd) {}

/// Hook to call immediately after exec command.
/// Will be a special persistence after this exec if a observer returns true.
fn post_exec_query(
&self,
_: &mut ObserverContext<'_>,
_: &Cmd,
_: &RaftApplyState,
_: &RegionState,
) -> bool {
false
}

/// Hook before exec write request, returns whether we should skip this
/// write.
fn pre_exec_query(&self, _: &mut ObserverContext<'_>, _: &[Request], _: u64, _: u64) -> bool {
Expand Down Expand Up @@ -241,14 +272,16 @@ pub trait RegionChangeObserver: Coprocessor {
#[derive(Clone, Debug, Default)]
pub struct Cmd {
pub index: u64,
pub term: u64,
pub request: RaftCmdRequest,
pub response: RaftCmdResponse,
}

impl Cmd {
pub fn new(index: u64, request: RaftCmdRequest, response: RaftCmdResponse) -> Cmd {
pub fn new(index: u64, term: u64, request: RaftCmdRequest, response: RaftCmdResponse) -> Cmd {
Cmd {
index,
term,
request,
response,
}
Expand Down
Loading