Skip to content

Commit

Permalink
Move proxy_instance to shared
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <tshent@qq.com>
  • Loading branch information
JaySon-Huang committed Aug 22, 2022
1 parent 071aca5 commit a438a9a
Showing 1 changed file with 36 additions and 48 deletions.
84 changes: 36 additions & 48 deletions dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Debug/MockSSTReader.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionExecutionResult.h>
#include <Storages/Transaction/StorageEngineType.h>
Expand All @@ -41,6 +42,8 @@ extern void ChangeRegionStateRange(RegionState & region_state, bool source_at_le

namespace tests
{

// TODO: Use another way to workaround calling the private methods on KVStore
class RegionKVStoreTest : public ::testing::Test
{
public:
Expand All @@ -55,11 +58,21 @@ class RegionKVStoreTest : public ::testing::Test
{
// clean data and create path pool instance
path_pool = createCleanPathPool(test_path);

reloadKVSFromDisk();

proxy_instance = std::make_unique<MockRaftStoreProxy>();
proxy_helper = std::make_unique<TiFlashRaftProxyHelper>(MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(
RaftStoreProxyPtr{proxy_instance.get()}));
proxy_instance->init(100);

kvstore->restore(*path_pool, proxy_helper.get());
}

void TearDown() override
{
proxy_helper.reset();
proxy_instance.reset();
kvstore.reset();
path_pool.reset();
}
Expand All @@ -78,7 +91,8 @@ class RegionKVStoreTest : public ::testing::Test
kvstore.reset();
auto & global_ctx = TiFlashTestEnv::getGlobalContext();
kvstore = std::make_unique<KVStore>(global_ctx, TiDB::SnapshotApplyMethod::DTFile_Directory);
// TODO: Move kvstore.restore() into here
// TODO: Do we need to recreate proxy_instance and proxy_helper here?
kvstore->restore(*path_pool, proxy_helper.get());
return *kvstore;
}

Expand All @@ -105,26 +119,25 @@ class RegionKVStoreTest : public ::testing::Test
}

static std::string test_path;

static std::unique_ptr<PathPool> path_pool;
static std::unique_ptr<KVStore> kvstore;

static std::unique_ptr<MockRaftStoreProxy> proxy_instance;
static std::unique_ptr<TiFlashRaftProxyHelper> proxy_helper;
};

std::string RegionKVStoreTest::test_path;
std::unique_ptr<PathPool> RegionKVStoreTest::path_pool;
std::unique_ptr<KVStore> RegionKVStoreTest::kvstore;
std::unique_ptr<MockRaftStoreProxy> RegionKVStoreTest::proxy_instance;
std::unique_ptr<TiFlashRaftProxyHelper> RegionKVStoreTest::proxy_helper;

void RegionKVStoreTest::testNewProxy()
{
auto ctx = TiFlashTestEnv::getGlobalContext();

KVStore & kvs = getKVS();
MockRaftStoreProxy proxy_instance;
TiFlashRaftProxyHelper proxy_helper;
{
proxy_helper = MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr{&proxy_instance});
proxy_instance.init(100);
}
kvs.restore(*path_pool, &proxy_helper);
{
auto store = metapb::Store{};
store.set_id(1234);
Expand Down Expand Up @@ -167,22 +180,13 @@ void RegionKVStoreTest::testReadIndex()
{
auto ctx = TiFlashTestEnv::getGlobalContext();

MockRaftStoreProxy proxy_instance;
TiFlashRaftProxyHelper proxy_helper;
{
proxy_helper = MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr{&proxy_instance});
proxy_instance.init(10);
}
std::atomic_bool over{false};

// start mock proxy in other thread
std::atomic_bool over{false};
auto proxy_runner = std::thread([&]() {
proxy_instance.testRunNormal(over);
proxy_instance->testRunNormal(over);
});
KVStore & kvs = getKVS();
kvs.restore(*path_pool, &proxy_helper);
ASSERT_EQ(kvs.getProxyHelper(), &proxy_helper);

ASSERT_EQ(kvs.getProxyHelper(), proxy_helper.get());

{
ASSERT_EQ(kvs.getRegion(0), nullptr);
Expand Down Expand Up @@ -242,8 +246,8 @@ void RegionKVStoreTest::testReadIndex()
lock.index.add(region);
}
{
ASSERT_EQ(proxy_instance.regions.at(tar_region_id)->getLatestCommitIndex(), 5);
proxy_instance.regions.at(tar_region_id)->updateCommitIndex(66);
ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 5);
proxy_instance->regions.at(tar_region_id)->updateCommitIndex(66);
}

AsyncWaker::Notifier notifier;
Expand Down Expand Up @@ -271,8 +275,8 @@ void RegionKVStoreTest::testReadIndex()

auto tar_region_id = 9;
{
ASSERT_EQ(proxy_instance.regions.at(tar_region_id)->getLatestCommitIndex(), 66);
proxy_instance.unsafeInvokeForTest([&](MockRaftStoreProxy & p) {
ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 66);
proxy_instance->unsafeInvokeForTest([&](MockRaftStoreProxy & p) {
p.region_id_to_error.emplace(tar_region_id);
p.regions.at(2)->updateCommitIndex(6);
});
Expand Down Expand Up @@ -310,7 +314,7 @@ void RegionKVStoreTest::testReadIndex()
ASSERT_EQ(std::get<0>(r), WaitIndexResult::Terminated);
}
}
for (auto & r : proxy_instance.regions)
for (auto & r : proxy_instance->regions)
{
r.second->updateCommitIndex(667);
}
Expand All @@ -329,7 +333,7 @@ void RegionKVStoreTest::testReadIndex()
{
auto region = kvs.getRegion(2);
auto req = GenRegionReadIndexReq(*region, 5);
auto resp = proxy_helper.batchReadIndex({req}, 100); // v2
auto resp = proxy_helper->batchReadIndex({req}, 100); // v2
ASSERT_EQ(resp[0].first.read_index(), 667); // got latest
{
auto r = region->waitIndex(667 + 1, 2, []() { return true; });
Expand All @@ -354,7 +358,7 @@ void RegionKVStoreTest::testReadIndex()
kvs.stopReadIndexWorkers();
kvs.releaseReadIndexWorkers();
over = true;
proxy_instance.wake();
proxy_instance->wake();
proxy_runner.join();
ASSERT(GCMonitor::instance().checkClean());
ASSERT(!GCMonitor::instance().empty());
Expand Down Expand Up @@ -869,13 +873,6 @@ void RegionKVStoreTest::testKVStore()
auto ctx = TiFlashTestEnv::getGlobalContext();

KVStore & kvs = getKVS();
MockRaftStoreProxy proxy_instance;
TiFlashRaftProxyHelper proxy_helper;
{
proxy_helper = MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr{&proxy_instance});
proxy_instance.init(100);
}
kvs.restore(*path_pool, &proxy_helper);
{
// Run without read-index workers

Expand Down Expand Up @@ -1169,10 +1166,10 @@ void RegionKVStoreTest::testKVStore()
}

{
const auto * ori_ptr = proxy_helper.proxy_ptr.inner;
proxy_helper.proxy_ptr.inner = nullptr;
const auto * ori_ptr = proxy_helper->proxy_ptr.inner;
proxy_helper->proxy_ptr.inner = nullptr;
SCOPE_EXIT({
proxy_helper.proxy_ptr.inner = ori_ptr;
proxy_helper->proxy_ptr.inner = ori_ptr;
});

try
Expand All @@ -1194,7 +1191,7 @@ void RegionKVStoreTest::testKVStore()
}

{
proxy_instance.getRegion(22)->setSate(({
proxy_instance->getRegion(22)->setSate(({
raft_serverpb::RegionLocalState s;
s.set_state(::raft_serverpb::PeerState::Tombstone);
s;
Expand All @@ -1208,7 +1205,7 @@ void RegionKVStoreTest::testKVStore()
ctx.getTMTContext());
kvs.checkAndApplySnapshot<RegionPtrWithSnapshotFiles>(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, tombstone, remove previous one

auto state = proxy_helper.getRegionLocalState(8192);
auto state = proxy_helper->getRegionLocalState(8192);
ASSERT_EQ(state.state(), raft_serverpb::PeerState::Tombstone);
}

Expand Down Expand Up @@ -1285,16 +1282,8 @@ void RegionKVStoreTest::testKVStore()

void RegionKVStoreTest::testKVStoreRestore()
{
MockRaftStoreProxy proxy_instance;
TiFlashRaftProxyHelper proxy_helper;
{
proxy_helper = MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr{&proxy_instance});
proxy_instance.init(100);
}

{
KVStore & kvs = getKVS();
kvs.restore(*path_pool, &proxy_helper);
{
auto store = metapb::Store{};
store.set_id(1234);
Expand Down Expand Up @@ -1327,7 +1316,6 @@ void RegionKVStoreTest::testKVStoreRestore()
}
{
KVStore & kvs = reloadKVSFromDisk();
kvs.restore(*path_pool, &proxy_helper);
kvs.getRegion(1);
kvs.getRegion(2);
kvs.getRegion(3);
Expand Down

0 comments on commit a438a9a

Please sign in to comment.