Skip to content

Commit

Permalink
Support pre_exec in Proxy (#5386)
Browse files Browse the repository at this point in the history
ref #5170
  • Loading branch information
CalvinNeo authored Jul 19, 2022
1 parent 21dea59 commit e737618
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 21 deletions.
18 changes: 5 additions & 13 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,12 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(

curr_region.handleWriteRaftCmd({}, index, term, tmt);

const auto check_sync_log = [&]() {
if (cmd_type != raft_cmdpb::AdminCmdType::CompactLog)
{
// ignore ComputeHash, VerifyHash or other useless cmd.
return false;
}
else
{
return canFlushRegionDataImpl(curr_region_ptr, true, /* try_until_succeed */ false, tmt, region_task_lock);
}
};

if (check_sync_log())
if (cmd_type == raft_cmdpb::AdminCmdType::CompactLog)
{
// Before CompactLog, we ought to make sure all data of this region are persisted.
// So proxy will firstly call an FFI `fn_try_flush_data` to trigger a attempt to flush data on TiFlash's side.
// If the attempt fails, Proxy will filter execution of this CompactLog, which means every CompactLog observed by TiFlash can ALWAYS succeed now.
// ref. https://github.com/pingcap/tidb-engine-ext/blob/e83a37d2d8d8ae1778fe279c5f06a851f8c9e56a/components/raftstore/src/engine_store_ffi/observer.rs#L175
return EngineStoreApplyRes::Persist;
}
return EngineStoreApplyRes::None;
Expand Down
79 changes: 72 additions & 7 deletions dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class RegionKVStoreTest : public ::testing::Test
static void testKVStore();
static void testRegion();
static void testReadIndex();
static void testNewProxy();

private:
static void testRaftSplit(KVStore & kvs, TMTContext & tmt);
Expand All @@ -54,6 +55,66 @@ class RegionKVStoreTest : public ::testing::Test
static void testRaftMergeRollback(KVStore & kvs, TMTContext & tmt);
};

void RegionKVStoreTest::testNewProxy()
{
std::string path = TiFlashTestEnv::getTemporaryPath("/region_kvs_tmp") + "/basic";

Poco::File file(path);
if (file.exists())
file.remove(true);
file.createDirectories();

auto ctx = TiFlashTestEnv::getContext(
DB::Settings(),
Strings{
path,
});
KVStore & kvs = *ctx.getTMTContext().getKVStore();
MockRaftStoreProxy proxy_instance;
TiFlashRaftProxyHelper proxy_helper;
{
proxy_helper = MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr{&proxy_instance});
proxy_instance.init(100);
}
kvs.restore(&proxy_helper);
{
auto store = metapb::Store{};
store.set_id(1234);
kvs.setStore(store);
ASSERT_EQ(kvs.getStoreID(), store.id());
}
{
ASSERT_EQ(kvs.getRegion(0), nullptr);
auto task_lock = kvs.genTaskLock();
auto lock = kvs.genRegionWriteLock(task_lock);
{
auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10));
lock.regions.emplace(1, region);
lock.index.add(region);
}
}
{
kvs.tryPersist(1);
kvs.gcRegionPersistedCache(Seconds{0});
}
{
// test CompactLog
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
auto region = kvs.getRegion(1);
region->markCompactLog();
kvs.setRegionCompactLogConfig(100000, 1000, 1000);
request.mutable_compact_log();
request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog);
// CompactLog always returns true now, even if we can't do a flush.
// We use a tryFlushData to pre-filter.
ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 5, 1, ctx.getTMTContext()), EngineStoreApplyRes::Persist);

// Filter
ASSERT_EQ(kvs.tryFlushRegionData(1, false, ctx.getTMTContext()), false);
}
}

void RegionKVStoreTest::testReadIndex()
{
std::string path = TiFlashTestEnv::getTemporaryPath("/region_kvs_tmp") + "/basic";
Expand Down Expand Up @@ -968,20 +1029,17 @@ void RegionKVStoreTest::testKVStore()

request.mutable_compact_log();
request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog);

raft_cmdpb::AdminRequest first_request = request;
raft_cmdpb::AdminResponse first_response = response;

ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(first_request), std::move(first_response), 7, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist);
ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(first_response), 7, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist);

raft_cmdpb::AdminResponse second_response = response;
ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(second_response), 7, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::None);
request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash);
ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(second_response), 7, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist);

request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash);
raft_cmdpb::AdminResponse third_response = response;
ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(third_response), 7, 24, 6, ctx.getTMTContext()), EngineStoreApplyRes::None);
request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash);

request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash);
raft_cmdpb::AdminResponse fourth_response = response;
ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fourth_response), 7, 25, 6, ctx.getTMTContext()), EngineStoreApplyRes::None);

Expand Down Expand Up @@ -1422,5 +1480,12 @@ try
}
CATCH

TEST_F(RegionKVStoreTest, NewProxy)
try
{
testNewProxy();
}
CATCH

} // namespace tests
} // namespace DB

0 comments on commit e737618

Please sign in to comment.