Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor KVStore/RegionPersister test #5679

Merged
merged 6 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dbms/src/Debug/MockSSTReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct MockSSTReader
Data() = default;
};

MockSSTReader(const Data & data_)
explicit MockSSTReader(const Data & data_)
: iter(data_.begin())
, end(data_.end())
, remained(iter != end)
Expand All @@ -70,16 +70,18 @@ struct MockSSTReader
};


class RegionMockTest
class RegionMockTest final
{
public:
RegionMockTest(KVStorePtr kvstore_, RegionPtr region_);
RegionMockTest(KVStore * kvstore_, RegionPtr region_);
~RegionMockTest();

DISALLOW_COPY_AND_MOVE(RegionMockTest);

private:
TiFlashRaftProxyHelper mock_proxy_helper{};
const TiFlashRaftProxyHelper * ori_proxy_helper{};
KVStorePtr kvstore;
KVStore * kvstore;
RegionPtr region;
};
} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ void fn_gc(SSTReaderPtr ptr, ColumnFamilyType)
delete reader;
}

RegionMockTest::RegionMockTest(KVStorePtr kvstore_, RegionPtr region_)
RegionMockTest::RegionMockTest(KVStore * kvstore_, RegionPtr region_)
: kvstore(kvstore_)
, region(region_)
{
Expand Down Expand Up @@ -465,7 +465,7 @@ void MockRaftCommand::dbgFuncIngestSST(Context & context, const ASTs & args, DBG

FailPointHelper::enableFailPoint(FailPoints::force_set_sst_decode_rand);
// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, region);
RegionMockTest mock_test(kvstore.get(), region);

{
// Mocking ingest a SST for column family "Write"
Expand Down Expand Up @@ -646,7 +646,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles(Context & context, c
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, start_handle, end_handle + 10000, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);
RegionMockTest mock_test(kvstore.get(), new_region);

std::vector<SSTView> sst_views;
{
Expand Down Expand Up @@ -743,7 +743,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFilesWithHandles(Context &
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, region_start_handle, region_end_handle, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);
RegionMockTest mock_test(kvstore.get(), new_region);

std::vector<SSTView> sst_views;
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/DTTool/DTTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class ImitativeEnv

global_context->setDeltaIndexManager(1024 * 1024 * 100 /*100MB*/);

global_context->getTMTContext().restore();
auto & path_pool = global_context->getPathPool();
global_context->getTMTContext().restore(path_pool);
return global_context;
}

Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1229,8 +1229,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
if (proxy_conf.is_proxy_runnable && !tiflash_instance_wrap.proxy_helper)
throw Exception("Raft Proxy Helper is not set, should not happen");
auto & path_pool = global_context->getPathPool();
/// initialize TMTContext
global_context->getTMTContext().restore(tiflash_instance_wrap.proxy_helper);
global_context->getTMTContext().restore(path_pool, tiflash_instance_wrap.proxy_helper);
}

/// setting up elastic thread pool
Expand Down Expand Up @@ -1307,14 +1308,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
assert(tiflash_instance_wrap.proxy_helper->getProxyStatus() == RaftProxyStatus::Running);
LOG_FMT_INFO(log, "store {}, tiflash proxy is ready to serve, try to wake up all regions' leader", tmt_context.getKVStore()->getStoreID(std::memory_order_seq_cst));
size_t runner_cnt = config().getUInt("flash.read_index_runner_count", 1); // if set 0, DO NOT enable read-index worker
tmt_context.getKVStore()->initReadIndexWorkers(
auto & kvstore_ptr = tmt_context.getKVStore();
kvstore_ptr->initReadIndexWorkers(
[&]() {
// get from tmt context
return std::chrono::milliseconds(tmt_context.readIndexWorkerTick());
},
/*running thread count*/ runner_cnt);
tmt_context.getKVStore()->asyncRunReadIndexWorkers();
WaitCheckRegionReady(tmt_context, terminate_signals_counter);
WaitCheckRegionReady(tmt_context, *kvstore_ptr, terminate_signals_counter);
}
SCOPE_EXIT({
if (proxy_conf.is_proxy_runnable && tiflash_instance_wrap.status != EngineStoreServerStatus::Running)
Expand Down Expand Up @@ -1391,4 +1393,4 @@ int mainEntryClickHouseServer(int argc, char ** argv)
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}
}
3 changes: 2 additions & 1 deletion dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ dt_open_file_max_idle_seconds = 20
dt_page_gc_low_write_prob = 0.2
)"};
auto & global_ctx = TiFlashTestEnv::getGlobalContext();
auto & global_path_pool = global_ctx.getPathPool();
RegionManager region_manager;
RegionPersister persister(global_ctx, region_manager);
persister.restore(nullptr, PageStorage::Config{});
persister.restore(global_path_pool, nullptr, PageStorage::Config{});

auto verify_persister_reload_config = [&global_ctx](RegionPersister & persister) {
DB::Settings & settings = global_ctx.getSettingsRef();
Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ KVStore::KVStore(Context & context, TiDB::SnapshotApplyMethod snapshot_apply_met
// default config about compact-log: period 120s, rows 40k, bytes 32MB.
}

void KVStore::restore(const TiFlashRaftProxyHelper * proxy_helper)
void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper)
{
auto task_lock = genTaskLock();
auto manage_lock = genRegionWriteLock(task_lock);

this->proxy_helper = proxy_helper;
manage_lock.regions = region_persister->restore(proxy_helper);
manage_lock.regions = region_persister->restore(path_pool, proxy_helper);

LOG_FMT_INFO(log, "Restored {} regions", manage_lock.regions.size());

Expand Down Expand Up @@ -625,6 +625,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ

void WaitCheckRegionReady(
const TMTContext & tmt,
KVStore & kvstore,
const std::atomic_size_t & terminate_signals_counter,
double wait_tick_time,
double max_wait_tick_time,
Expand All @@ -644,7 +645,7 @@ void WaitCheckRegionReady(
Stopwatch region_check_watch;
size_t total_regions_cnt = 0;
{
tmt.getKVStore()->traverseRegions([&remain_regions](RegionID region_id, const RegionPtr &) { remain_regions.emplace(region_id); });
kvstore.traverseRegions([&remain_regions](RegionID region_id, const RegionPtr &) { remain_regions.emplace(region_id); });
total_regions_cnt = remain_regions.size();
}
while (region_check_watch.elapsedSeconds() < get_wait_region_ready_timeout_sec * batch_read_index_time_rate
Expand All @@ -654,7 +655,7 @@ void WaitCheckRegionReady(
for (auto it = remain_regions.begin(); it != remain_regions.end();)
{
auto region_id = *it;
if (auto region = tmt.getKVStore()->getRegion(region_id); region)
if (auto region = kvstore.getRegion(region_id); region)
{
batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region));
it++;
Expand All @@ -664,7 +665,7 @@ void WaitCheckRegionReady(
it = remain_regions.erase(it);
}
}
auto read_index_res = tmt.getKVStore()->batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout());
auto read_index_res = kvstore.batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout());
for (auto && [resp, region_id] : read_index_res)
{
bool need_retry = resp.read_index() == 0;
Expand Down Expand Up @@ -716,7 +717,7 @@ void WaitCheckRegionReady(
for (auto it = regions_to_check.begin(); it != regions_to_check.end();)
{
auto [region_id, latest_index] = *it;
if (auto region = tmt.getKVStore()->getRegion(region_id); region)
if (auto region = kvstore.getRegion(region_id); region)
{
if (region->appliedIndex() >= latest_index)
{
Expand Down Expand Up @@ -752,7 +753,7 @@ void WaitCheckRegionReady(
regions_to_check.begin(),
regions_to_check.end(),
[&](const auto & e, FmtBuffer & b) {
if (auto r = tmt.getKVStore()->getRegion(e.first); r)
if (auto r = kvstore.getRegion(e.first); r)
{
b.fmtAppend("{},{},{}", e.first, e.second, r->appliedIndex());
}
Expand All @@ -771,14 +772,14 @@ void WaitCheckRegionReady(
region_check_watch.elapsedSeconds());
}

void WaitCheckRegionReady(const TMTContext & tmt, const std::atomic_size_t & terminate_signals_counter)
void WaitCheckRegionReady(const TMTContext & tmt, KVStore & kvstore, const std::atomic_size_t & terminate_signals_counter)
{
// wait interval to check region ready, not recommended to modify only if for tesing
auto wait_region_ready_tick = tmt.getContext().getConfigRef().getUInt64("flash.wait_region_ready_tick", 0);
auto wait_region_ready_timeout_sec = static_cast<double>(tmt.waitRegionReadyTimeout());
const double max_wait_tick_time = 0 == wait_region_ready_tick ? 20.0 : wait_region_ready_timeout_sec;
double min_wait_tick_time = 0 == wait_region_ready_tick ? 2.5 : static_cast<double>(wait_region_ready_tick); // default tick in TiKV is about 2s (without hibernate-region)
return WaitCheckRegionReady(tmt, terminate_signals_counter, min_wait_tick_time, max_wait_tick_time, wait_region_ready_timeout_sec);
return WaitCheckRegionReady(tmt, kvstore, terminate_signals_counter, min_wait_tick_time, max_wait_tick_time, wait_region_ready_timeout_sec);
}

void KVStore::setStore(metapb::Store store_)
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ class ReadIndexWorkerManager;
using BatchReadIndexRes = std::vector<std::pair<kvrpcpb::ReadIndexResponse, uint64_t>>;
class ReadIndexStressTest;
struct FileUsageStatistics;
class PathPool;
class RegionPersister;

/// TODO: brief design document.
class KVStore final : private boost::noncopyable
{
public:
KVStore(Context & context, TiDB::SnapshotApplyMethod snapshot_apply_method_);
void restore(const TiFlashRaftProxyHelper *);
void restore(PathPool & path_pool, const TiFlashRaftProxyHelper *);

RegionPtr getRegion(RegionID region_id) const;

Expand Down Expand Up @@ -162,7 +163,9 @@ class KVStore final : private boost::noncopyable

FileUsageStatistics getFileUsageStatistics() const;

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
friend class MockTiDB;
friend struct MockTiDBTable;
friend struct MockRaftCommand;
Expand Down Expand Up @@ -231,7 +234,9 @@ class KVStore final : private boost::noncopyable
void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
RegionManager region_manager;

std::unique_ptr<RegionPersister> region_persister;
Expand Down Expand Up @@ -275,7 +280,7 @@ class KVStoreTaskLock : private boost::noncopyable
std::lock_guard<std::mutex> lock;
};

void WaitCheckRegionReady(const TMTContext &, const std::atomic_size_t & terminate_signals_counter);
void WaitCheckRegionReady(const TMTContext &, const std::atomic_size_t &, double, double, double);
void WaitCheckRegionReady(const TMTContext &, KVStore & kvstore, const std::atomic_size_t & terminate_signals_counter);
void WaitCheckRegionReady(const TMTContext &, KVStore & kvstore, const std::atomic_size_t &, double, double, double);

} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Region : public std::enable_shared_from_this<Region>
class CommittedScanner : private boost::noncopyable
{
public:
CommittedScanner(const RegionPtr & store_, bool use_lock = true)
explicit CommittedScanner(const RegionPtr & store_, bool use_lock = true)
: store(store_)
{
if (use_lock)
Expand Down Expand Up @@ -97,7 +97,7 @@ class Region : public std::enable_shared_from_this<Region>
class CommittedRemover : private boost::noncopyable
{
public:
CommittedRemover(const RegionPtr & store_, bool use_lock = true)
explicit CommittedRemover(const RegionPtr & store_, bool use_lock = true)
: store(store_)
{
if (use_lock)
Expand Down Expand Up @@ -245,11 +245,11 @@ class RegionRaftCommandDelegate : public Region
const RegionRangeKeys & getRange();
UInt64 appliedIndex();

RegionRaftCommandDelegate() = delete;

private:
friend class tests::RegionKVStoreTest;

RegionRaftCommandDelegate() = delete;

Regions execBatchSplit(
const raft_cmdpb::AdminRequest & request,
const raft_cmdpb::AdminResponse & response,
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Transaction/RegionMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ class RegionMeta
metapb::Region getMetaRegion() const;
raft_serverpb::MergeState getMergeState() const;

private:
RegionMeta() = delete;

private:
friend class MetaRaftCommandDelegate;
friend class tests::RegionKVStoreTest;

Expand Down Expand Up @@ -157,8 +158,6 @@ class MetaRaftCommandDelegate
friend class RegionRaftCommandDelegate;
friend class tests::RegionKVStoreTest;

MetaRaftCommandDelegate() = delete;

const metapb::Peer & getPeer() const;
const raft_serverpb::RaftApplyState & applyState() const;
const RegionState & regionState() const;
Expand Down Expand Up @@ -192,6 +191,8 @@ class MetaRaftCommandDelegate
static RegionMergeResult computeRegionMergeResult(
const metapb::Region & source_region,
const metapb::Region & target_region);

MetaRaftCommandDelegate() = delete;
};

} // namespace DB
20 changes: 10 additions & 10 deletions dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ void RegionPersister::forceTransformKVStoreV2toV3()
page_writer->writeIntoV2(std::move(write_batch_del_v2), nullptr);
}

RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper, PageStorage::Config config)
RegionMap RegionPersister::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper, PageStorage::Config config)
{
{
auto & path_pool = global_context.getPathPool();
auto delegator = path_pool.getPSDiskDelegatorRaft();
auto provider = global_context.getFileProvider();
auto run_mode = global_context.getPageStorageRunMode();
const auto global_run_mode = global_context.getPageStorageRunMode();
auto run_mode = global_run_mode;

switch (run_mode)
switch (global_run_mode)
{
case PageStorageRunMode::ONLY_V2:
{
Expand All @@ -245,8 +245,8 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper,
config,
provider);
page_storage_v2->restore();
page_writer = std::make_shared<PageWriter>(run_mode, page_storage_v2, /*storage_v3_*/ nullptr);
page_reader = std::make_shared<PageReader>(run_mode, ns_id, page_storage_v2, /*storage_v3_*/ nullptr, /*readlimiter*/ global_context.getReadLimiter());
page_writer = std::make_shared<PageWriter>(global_run_mode, page_storage_v2, /*storage_v3_*/ nullptr);
page_reader = std::make_shared<PageReader>(global_run_mode, ns_id, page_storage_v2, /*storage_v3_*/ nullptr, /*readlimiter*/ global_context.getReadLimiter());
}
else
{
Expand All @@ -270,8 +270,8 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper,
config,
provider);
page_storage_v3->restore();
page_writer = std::make_shared<PageWriter>(run_mode, /*storage_v2_*/ nullptr, page_storage_v3);
page_reader = std::make_shared<PageReader>(run_mode, ns_id, /*storage_v2_*/ nullptr, page_storage_v3, global_context.getReadLimiter());
page_writer = std::make_shared<PageWriter>(global_run_mode, /*storage_v2_*/ nullptr, page_storage_v3);
page_reader = std::make_shared<PageReader>(global_run_mode, ns_id, /*storage_v2_*/ nullptr, page_storage_v3, global_context.getReadLimiter());
break;
}
case PageStorageRunMode::MIX_MODE:
Expand All @@ -296,8 +296,8 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper,

if (const auto & kvstore_remain_pages = page_storage_v2->getNumberOfPages(); kvstore_remain_pages != 0)
{
page_writer = std::make_shared<PageWriter>(run_mode, page_storage_v2, page_storage_v3);
page_reader = std::make_shared<PageReader>(run_mode, ns_id, page_storage_v2, page_storage_v3, global_context.getReadLimiter());
page_writer = std::make_shared<PageWriter>(global_run_mode, page_storage_v2, page_storage_v3);
page_reader = std::make_shared<PageReader>(global_run_mode, ns_id, page_storage_v2, page_storage_v3, global_context.getReadLimiter());

LOG_FMT_INFO(log, "Current kvstore transform to V3 begin [pages_before_transform={}]", kvstore_remain_pages);
forceTransformKVStoreV2toV3();
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/RegionPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace DB
{
class Context;

class PathPool;
class Region;
using RegionPtr = std::shared_ptr<Region>;
using RegionMap = std::unordered_map<RegionID, RegionPtr>;
Expand All @@ -50,7 +51,7 @@ class RegionPersister final : private boost::noncopyable
void drop(RegionID region_id, const RegionTaskLock &);
void persist(const Region & region);
void persist(const Region & region, const RegionTaskLock & lock);
RegionMap restore(const TiFlashRaftProxyHelper * proxy_helper = nullptr, PageStorage::Config config = PageStorage::Config{});
RegionMap restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper = nullptr, PageStorage::Config config = PageStorage::Config{});
bool gc();

using RegionCacheWriteElement = std::tuple<RegionID, MemoryWriteBuffer, size_t, UInt64>;
Expand Down
Loading