Skip to content

Commit

Permalink
Refactor kvstore test
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <tshent@qq.com>
  • Loading branch information
JaySon-Huang committed Aug 23, 2022
1 parent 02b3138 commit 540c06a
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 89 deletions.
10 changes: 6 additions & 4 deletions dbms/src/Debug/MockSSTReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct MockSSTReader
Data() = default;
};

MockSSTReader(const Data & data_)
explicit MockSSTReader(const Data & data_)
: iter(data_.begin())
, end(data_.end())
, remained(iter != end)
Expand All @@ -70,16 +70,18 @@ struct MockSSTReader
};


class RegionMockTest
class RegionMockTest final
{
public:
RegionMockTest(KVStorePtr kvstore_, RegionPtr region_);
RegionMockTest(KVStore * kvstore_, RegionPtr region_);
~RegionMockTest();

DISALLOW_COPY_AND_MOVE(RegionMockTest);

private:
TiFlashRaftProxyHelper mock_proxy_helper{};
const TiFlashRaftProxyHelper * ori_proxy_helper{};
KVStorePtr kvstore;
KVStore * kvstore;
RegionPtr region;
};
} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ void fn_gc(SSTReaderPtr ptr, ColumnFamilyType)
delete reader;
}

RegionMockTest::RegionMockTest(KVStorePtr kvstore_, RegionPtr region_)
RegionMockTest::RegionMockTest(KVStore * kvstore_, RegionPtr region_)
: kvstore(kvstore_)
, region(region_)
{
Expand Down Expand Up @@ -465,7 +465,7 @@ void MockRaftCommand::dbgFuncIngestSST(Context & context, const ASTs & args, DBG

FailPointHelper::enableFailPoint(FailPoints::force_set_sst_decode_rand);
// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, region);
RegionMockTest mock_test(kvstore.get(), region);

{
// Mocking ingest a SST for column family "Write"
Expand Down Expand Up @@ -646,7 +646,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles(Context & context, c
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, start_handle, end_handle + 10000, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);
RegionMockTest mock_test(kvstore.get(), new_region);

std::vector<SSTView> sst_views;
{
Expand Down Expand Up @@ -743,7 +743,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFilesWithHandles(Context &
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, region_start_handle, region_end_handle, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);
RegionMockTest mock_test(kvstore.get(), new_region);

std::vector<SSTView> sst_views;
{
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1308,14 +1308,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
assert(tiflash_instance_wrap.proxy_helper->getProxyStatus() == RaftProxyStatus::Running);
LOG_FMT_INFO(log, "store {}, tiflash proxy is ready to serve, try to wake up all regions' leader", tmt_context.getKVStore()->getStoreID(std::memory_order_seq_cst));
size_t runner_cnt = config().getUInt("flash.read_index_runner_count", 1); // if set 0, DO NOT enable read-index worker
tmt_context.getKVStore()->initReadIndexWorkers(
auto & kvstore_ptr = tmt_context.getKVStore();
kvstore_ptr->initReadIndexWorkers(
[&]() {
// get from tmt context
return std::chrono::milliseconds(tmt_context.readIndexWorkerTick());
},
/*running thread count*/ runner_cnt);
tmt_context.getKVStore()->asyncRunReadIndexWorkers();
WaitCheckRegionReady(tmt_context, terminate_signals_counter);
WaitCheckRegionReady(tmt_context, *kvstore_ptr, terminate_signals_counter);
}
SCOPE_EXIT({
if (proxy_conf.is_proxy_runnable && tiflash_instance_wrap.status != EngineStoreServerStatus::Running)
Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ

void WaitCheckRegionReady(
const TMTContext & tmt,
KVStore & kvstore,
const std::atomic_size_t & terminate_signals_counter,
double wait_tick_time,
double max_wait_tick_time,
Expand All @@ -644,7 +645,7 @@ void WaitCheckRegionReady(
Stopwatch region_check_watch;
size_t total_regions_cnt = 0;
{
tmt.getKVStore()->traverseRegions([&remain_regions](RegionID region_id, const RegionPtr &) { remain_regions.emplace(region_id); });
kvstore.traverseRegions([&remain_regions](RegionID region_id, const RegionPtr &) { remain_regions.emplace(region_id); });
total_regions_cnt = remain_regions.size();
}
while (region_check_watch.elapsedSeconds() < get_wait_region_ready_timeout_sec * batch_read_index_time_rate
Expand All @@ -654,7 +655,7 @@ void WaitCheckRegionReady(
for (auto it = remain_regions.begin(); it != remain_regions.end();)
{
auto region_id = *it;
if (auto region = tmt.getKVStore()->getRegion(region_id); region)
if (auto region = kvstore.getRegion(region_id); region)
{
batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region));
it++;
Expand All @@ -664,7 +665,7 @@ void WaitCheckRegionReady(
it = remain_regions.erase(it);
}
}
auto read_index_res = tmt.getKVStore()->batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout());
auto read_index_res = kvstore.batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout());
for (auto && [resp, region_id] : read_index_res)
{
bool need_retry = resp.read_index() == 0;
Expand Down Expand Up @@ -716,7 +717,7 @@ void WaitCheckRegionReady(
for (auto it = regions_to_check.begin(); it != regions_to_check.end();)
{
auto [region_id, latest_index] = *it;
if (auto region = tmt.getKVStore()->getRegion(region_id); region)
if (auto region = kvstore.getRegion(region_id); region)
{
if (region->appliedIndex() >= latest_index)
{
Expand Down Expand Up @@ -752,7 +753,7 @@ void WaitCheckRegionReady(
regions_to_check.begin(),
regions_to_check.end(),
[&](const auto & e, FmtBuffer & b) {
if (auto r = tmt.getKVStore()->getRegion(e.first); r)
if (auto r = kvstore.getRegion(e.first); r)
{
b.fmtAppend("{},{},{}", e.first, e.second, r->appliedIndex());
}
Expand All @@ -771,14 +772,14 @@ void WaitCheckRegionReady(
region_check_watch.elapsedSeconds());
}

void WaitCheckRegionReady(const TMTContext & tmt, const std::atomic_size_t & terminate_signals_counter)
void WaitCheckRegionReady(const TMTContext & tmt, KVStore & kvstore, const std::atomic_size_t & terminate_signals_counter)
{
// wait interval to check region ready, not recommended to modify only if for tesing
auto wait_region_ready_tick = tmt.getContext().getConfigRef().getUInt64("flash.wait_region_ready_tick", 0);
auto wait_region_ready_timeout_sec = static_cast<double>(tmt.waitRegionReadyTimeout());
const double max_wait_tick_time = 0 == wait_region_ready_tick ? 20.0 : wait_region_ready_timeout_sec;
double min_wait_tick_time = 0 == wait_region_ready_tick ? 2.5 : static_cast<double>(wait_region_ready_tick); // default tick in TiKV is about 2s (without hibernate-region)
return WaitCheckRegionReady(tmt, terminate_signals_counter, min_wait_tick_time, max_wait_tick_time, wait_region_ready_timeout_sec);
return WaitCheckRegionReady(tmt, kvstore, terminate_signals_counter, min_wait_tick_time, max_wait_tick_time, wait_region_ready_timeout_sec);
}

void KVStore::setStore(metapb::Store store_)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class KVStoreTaskLock : private boost::noncopyable
std::lock_guard<std::mutex> lock;
};

void WaitCheckRegionReady(const TMTContext &, const std::atomic_size_t & terminate_signals_counter);
void WaitCheckRegionReady(const TMTContext &, const std::atomic_size_t &, double, double, double);
void WaitCheckRegionReady(const TMTContext &, KVStore & kvstore, const std::atomic_size_t & terminate_signals_counter);
void WaitCheckRegionReady(const TMTContext &, KVStore & kvstore, const std::atomic_size_t &, double, double, double);

} // namespace DB
Loading

0 comments on commit 540c06a

Please sign in to comment.