From 9836c27a0105e89f00b41d979175198a33987e0c Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Fri, 27 Dec 2024 17:28:03 +0800 Subject: [PATCH] Storages: move ann_query_info to PushDownExecutor (#9741) ref pingcap/tiflash#6233 1. move `ann_query_info` to `PushDownFilter` 2. rename `PushDownFilter` to `PushDownExecutor` Signed-off-by: Lloyd-Pottiger --- dbms/src/Debug/MockStorage.cpp | 4 +- .../Coprocessor/DAGExpressionAnalyzer.cpp | 2 +- .../Flash/Coprocessor/DAGExpressionAnalyzer.h | 2 +- .../Flash/Coprocessor/InterpreterUtils.cpp | 4 +- .../src/Flash/tests/gtest_filter_executor.cpp | 4 +- .../src/Operators/DMSegmentThreadSourceOp.cpp | 6 +- dbms/src/Operators/DMSegmentThreadSourceOp.h | 4 +- ...olumnFileSetWithVectorIndexInputStream.cpp | 1 - .../ConcatSkippableBlockInputStream.cpp | 22 +-- .../ConcatSkippableBlockInputStream.h | 3 +- .../DeltaMerge/DMSegmentThreadInputStream.h | 4 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 14 +- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 8 +- ...ushDownFilter.cpp => PushDownExecutor.cpp} | 37 +++- .../{PushDownFilter.h => PushDownExecutor.h} | 26 ++- .../Storages/DeltaMerge/Filter/RSOperator.cpp | 32 +--- .../Storages/DeltaMerge/Filter/RSOperator.h | 6 - .../DeltaMerge/Filter/WithANNQueryInfo.h | 65 ------- .../DeltaMerge/Remote/RNSegmentInputStream.h | 2 +- .../Remote/RNWorkerPrepareStreams.h | 10 +- .../Storages/DeltaMerge/Remote/RNWorkers.cpp | 2 +- .../Storages/DeltaMerge/Remote/RNWorkers.h | 2 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 162 ++++++++++++------ dbms/src/Storages/DeltaMerge/Segment.h | 24 ++- .../Storages/DeltaMerge/SegmentReadTask.cpp | 14 +- .../src/Storages/DeltaMerge/SegmentReadTask.h | 6 +- .../DeltaMerge/SegmentReadTaskPool.cpp | 2 +- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 6 +- .../tests/gtest_dm_delta_merge_store.cpp | 12 +- ...test_dm_delta_merge_store_vector_index.cpp | 46 ++--- .../tests/gtest_dm_minmax_index.cpp | 2 +- .../tests/gtest_dm_vector_index.cpp | 6 +- .../tests/gtest_dm_vector_index_utils.h | 2 +- .../gtest_skippable_block_input_stream.cpp | 2 - .../tests/gtest_kvstore_fast_add_peer.cpp | 13 +- dbms/src/Storages/StorageDeltaMerge.cpp | 8 +- dbms/src/Storages/StorageDeltaMerge.h | 2 +- .../Storages/StorageDisaggregatedRemote.cpp | 17 +- .../tests/gtests_parse_push_down_filter.cpp | 85 ++++----- .../TiDB/Schema/tests/gtest_schema_sync.cpp | 4 +- 40 files changed, 351 insertions(+), 322 deletions(-) rename dbms/src/Storages/DeltaMerge/Filter/{PushDownFilter.cpp => PushDownExecutor.cpp} (85%) rename dbms/src/Storages/DeltaMerge/Filter/{PushDownFilter.h => PushDownExecutor.h} (79%) delete mode 100644 dbms/src/Storages/DeltaMerge/Filter/WithANNQueryInfo.h diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index 428e8c1f086..6e211406669 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -211,7 +211,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge( rf_max_wait_time_ms, context.getTimezoneInfo()); auto [before_where, filter_column_name, project_after_where] - = analyzer->buildPushDownFilter(filter_conditions->conditions); + = analyzer->buildPushDownExecutor(filter_conditions->conditions); BlockInputStreams ins = storage->read( column_names, query_info, @@ -273,7 +273,7 @@ void MockStorage::buildExecFromDeltaMerge( rf_max_wait_time_ms, context.getTimezoneInfo()); // Not using `auto [before_where, filter_column_name, project_after_where]` just to make the compiler happy. - auto build_ret = analyzer->buildPushDownFilter(filter_conditions->conditions); + auto build_ret = analyzer->buildPushDownExecutor(filter_conditions->conditions); storage->read( exec_context_, group_builder, diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index f419a693ff0..f2e68b5e620 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -972,7 +972,7 @@ String DAGExpressionAnalyzer::buildFilterColumn( return filter_column_name; } -std::tuple DAGExpressionAnalyzer::buildPushDownFilter( +std::tuple DAGExpressionAnalyzer::buildPushDownExecutor( const google::protobuf::RepeatedPtrField & conditions, bool null_as_false) { diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index 8ef4dbc0b78..bdc9c9dcbe3 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -148,7 +148,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable const google::protobuf::RepeatedPtrField & conditions, bool null_as_false = false); - std::tuple buildPushDownFilter( + std::tuple buildPushDownExecutor( const google::protobuf::RepeatedPtrField & conditions, bool null_as_false = false); diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 91a7ca5db9b..993c560f10c 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -443,7 +443,7 @@ void executePushedDownFilter( DAGPipeline & pipeline) { auto [before_where, filter_column_name, project_after_where] - = analyzer.buildPushDownFilter(filter_conditions.conditions, true); + = analyzer.buildPushDownExecutor(filter_conditions.conditions, true); for (auto & stream : pipeline.streams) { @@ -464,7 +464,7 @@ void executePushedDownFilter( LoggerPtr log) { auto [before_where, filter_column_name, project_after_where] - = analyzer.buildPushDownFilter(filter_conditions.conditions, true); + = analyzer.buildPushDownExecutor(filter_conditions.conditions, true); auto input_header = group_builder.getCurrentHeader(); for (size_t i = 0; i < group_builder.concurrency(); ++i) diff --git a/dbms/src/Flash/tests/gtest_filter_executor.cpp b/dbms/src/Flash/tests/gtest_filter_executor.cpp index 244a0fb8163..5e11f65f6d0 100644 --- a/dbms/src/Flash/tests/gtest_filter_executor.cpp +++ b/dbms/src/Flash/tests/gtest_filter_executor.cpp @@ -235,7 +235,7 @@ try } CATCH -TEST_F(FilterExecutorTestRunner, convert_bool) +TEST_F(FilterExecutorTestRunner, convertBool) try { { @@ -282,7 +282,7 @@ try } CATCH -TEST_F(FilterExecutorTestRunner, PushDownFilter) +TEST_F(FilterExecutorTestRunner, PushDownExecutor) try { context.mockStorage()->setUseDeltaMerge(true); diff --git a/dbms/src/Operators/DMSegmentThreadSourceOp.cpp b/dbms/src/Operators/DMSegmentThreadSourceOp.cpp index 49e6ca0e19c..c6283d2f8d6 100644 --- a/dbms/src/Operators/DMSegmentThreadSourceOp.cpp +++ b/dbms/src/Operators/DMSegmentThreadSourceOp.cpp @@ -30,7 +30,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp( const DM::SegmentReadTaskPoolPtr & task_pool_, DM::AfterSegmentRead after_segment_read_, const DM::ColumnDefines & columns_to_read_, - const DM::PushDownFilterPtr & filter_, + const DM::PushDownExecutorPtr & executor_, UInt64 start_ts_, size_t expected_block_size_, DM::ReadMode read_mode_, @@ -40,7 +40,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp( , task_pool(task_pool_) , after_segment_read(after_segment_read_) , columns_to_read(columns_to_read_) - , filter(filter_) + , executor(executor_) , start_ts(start_ts_) , expected_block_size(expected_block_size_) , read_mode(read_mode_) @@ -100,7 +100,7 @@ OperatorStatus DMSegmentThreadSourceOp::executeIOImpl() columns_to_read, task->read_snapshot, task->ranges, - filter, + executor, start_ts, block_size); LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo()); diff --git a/dbms/src/Operators/DMSegmentThreadSourceOp.h b/dbms/src/Operators/DMSegmentThreadSourceOp.h index 0dc0fda15ec..fa1cbc21676 100644 --- a/dbms/src/Operators/DMSegmentThreadSourceOp.h +++ b/dbms/src/Operators/DMSegmentThreadSourceOp.h @@ -34,7 +34,7 @@ class DMSegmentThreadSourceOp : public SourceOp const DM::SegmentReadTaskPoolPtr & task_pool_, DM::AfterSegmentRead after_segment_read_, const DM::ColumnDefines & columns_to_read_, - const DM::PushDownFilterPtr & filter_, + const DM::PushDownExecutorPtr & executor_, UInt64 start_ts_, size_t expected_block_size_, DM::ReadMode read_mode_, @@ -56,7 +56,7 @@ class DMSegmentThreadSourceOp : public SourceOp DM::SegmentReadTaskPoolPtr task_pool; DM::AfterSegmentRead after_segment_read; DM::ColumnDefines columns_to_read; - DM::PushDownFilterPtr filter; + DM::PushDownExecutorPtr executor; const UInt64 start_ts; const size_t expected_block_size; const DM::ReadMode read_mode; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp index 825477d5e67..054753cf2cb 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.cpp @@ -15,7 +15,6 @@ #include #include #include -#include namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp index 6cdc5e2a32f..f4b0f4133e8 100644 --- a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp @@ -266,13 +266,12 @@ Block ConcatVectorIndexBlockInputStream::read() return block; } -SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build( +std::tuple ConcatVectorIndexBlockInputStream::build( const BitmapFilterPtr & bitmap_filter, std::shared_ptr> stream, const ANNQueryInfoPtr & ann_query_info) { - if (!ann_query_info) - return stream; + assert(ann_query_info != nullptr); bool has_vector_index_stream = false; std::vector index_streams; index_streams.reserve(stream->children.size()); @@ -287,13 +286,16 @@ SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build( index_streams.push_back(nullptr); } if (!has_vector_index_stream) - return stream; - - return std::make_shared( - bitmap_filter, - stream, - std::move(index_streams), - ann_query_info->top_k()); + return {stream, false}; + + return { + std::make_shared( + bitmap_filter, + stream, + std::move(index_streams), + ann_query_info->top_k()), + true, + }; } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h index 48d988fe11e..8c6bdb883d3 100644 --- a/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.h @@ -77,7 +77,8 @@ class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream , bitmap_filter(bitmap_filter_) {} - static SkippableBlockInputStreamPtr build( + // Returns + static std::tuple build( const BitmapFilterPtr & bitmap_filter, std::shared_ptr> stream, const ANNQueryInfoPtr & ann_query_info); diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 685f6b37e82..755fd7e3258 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -42,7 +42,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream const SegmentReadTaskPoolPtr & task_pool_, AfterSegmentRead after_segment_read_, const ColumnDefines & columns_to_read_, - const PushDownFilterPtr & filter_, + const PushDownExecutorPtr & filter_, UInt64 start_ts_, size_t expected_block_size_, ReadMode read_mode_, @@ -127,7 +127,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream SegmentReadTaskPoolPtr task_pool; AfterSegmentRead after_segment_read; ColumnDefines columns_to_read; - PushDownFilterPtr filter; + PushDownExecutorPtr filter; Block header; const UInt64 start_ts; const size_t expected_block_size; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0916b4d8f15..17a3cc29df6 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -37,7 +37,7 @@ #include #include #include -#include +#include #include #include #include @@ -1228,12 +1228,12 @@ ReadMode DeltaMergeStore::getReadMode( const Context & db_context, bool is_fast_scan, bool keep_order, - const PushDownFilterPtr & filter) + const PushDownExecutorPtr & filter) { auto read_mode = getReadModeImpl(db_context, is_fast_scan, keep_order); RUNTIME_CHECK_MSG( !filter || !filter->before_where || read_mode == ReadMode::Bitmap, - "Push down filters needs bitmap, push down filters is empty: {}, read mode: {}", + "Push down executor needs bitmap, push down executor is empty: {}, read mode: {}", filter == nullptr || filter->before_where == nullptr, magic_enum::enum_name(read_mode)); return read_mode; @@ -1246,7 +1246,7 @@ BlockInputStreams DeltaMergeStore::read( const RowKeyRanges & sorted_ranges, size_t num_streams, UInt64 start_ts, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & filter, const RuntimeFilteList & runtime_filter_list, int rf_max_wait_time_ms, const String & tracing_id, @@ -1332,7 +1332,7 @@ BlockInputStreams DeltaMergeStore::read( LOG_INFO( tracing_logger, "Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " - "is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} " + "is_fast_scan={} is_push_down_executor_empty={} pool_id={} num_streams={} columns_to_read={} " "final_columns_to_read={}", keep_order, db_context.getSettingsRef().dt_enable_read_thread, @@ -1356,7 +1356,7 @@ void DeltaMergeStore::read( const RowKeyRanges & sorted_ranges, size_t num_streams, UInt64 start_ts, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & filter, const RuntimeFilteList & runtime_filter_list, int rf_max_wait_time_ms, const String & tracing_id, @@ -1452,7 +1452,7 @@ void DeltaMergeStore::read( LOG_INFO( tracing_logger, "Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " - "is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} " + "is_fast_scan={} is_push_down_executor_empty={} pool_id={} num_streams={} columns_to_read={} " "final_columns_to_read={}", keep_order, db_context.getSettingsRef().dt_enable_read_thread, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 1ec73eb377b..d72041946df 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include #include #include @@ -460,7 +460,7 @@ class DeltaMergeStore const RowKeyRanges & sorted_ranges, size_t num_streams, UInt64 start_ts, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & filter, const RuntimeFilteList & runtime_filter_list, int rf_max_wait_time_ms, const String & tracing_id, @@ -485,7 +485,7 @@ class DeltaMergeStore const RowKeyRanges & sorted_ranges, size_t num_streams, UInt64 start_ts, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & filter, const RuntimeFilteList & runtime_filter_list, int rf_max_wait_time_ms, const String & tracing_id, @@ -588,7 +588,7 @@ class DeltaMergeStore const Context & db_context, bool is_fast_scan, bool keep_order, - const PushDownFilterPtr & filter); + const PushDownExecutorPtr & filter); // Get a snap of local_index_infos for checking. // Note that this is just a shallow copy of `local_index_infos`, do not diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp b/dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.cpp similarity index 85% rename from dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp rename to dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.cpp index b2f6fc3ca7c..de666f3e622 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.cpp @@ -18,24 +18,38 @@ #include #include #include -#include +#include #include #include namespace DB::DM { -PushDownFilterPtr PushDownFilter::build( +PushDownExecutorPtr PushDownExecutor::build( const RSOperatorPtr & rs_operator, + const ANNQueryInfoPtr & ann_query_info, const TiDB::ColumnInfos & table_scan_column_info, const google::protobuf::RepeatedPtrField & pushed_down_filters, const ColumnDefines & columns_to_read, const Context & context, const LoggerPtr & tracing_logger) { + // check if the ann_query_info is valid + auto valid_ann_query_info = ann_query_info; + if (ann_query_info) + { + bool is_valid_ann_query = ann_query_info->top_k() != std::numeric_limits::max(); + bool is_matching_ann_query = std::any_of( + columns_to_read.begin(), + columns_to_read.end(), + [cid = ann_query_info->column_id()](const ColumnDefine & cd) -> bool { return cd.id == cid; }); + if (!is_valid_ann_query || !is_matching_ann_query) + valid_ann_query_info = nullptr; + } + if (pushed_down_filters.empty()) { LOG_DEBUG(tracing_logger, "Push down filter is empty"); - return std::make_shared(rs_operator); + return std::make_shared(rs_operator, valid_ann_query_info); } std::unordered_map columns_to_read_map; for (const auto & column : columns_to_read) @@ -120,7 +134,7 @@ PushDownFilterPtr PushDownFilter::build( } // build filter expression actions - auto [before_where, filter_column_name, project_after_where] = analyzer->buildPushDownFilter(pushed_down_filters); + auto [before_where, filter_column_name, project_after_where] = analyzer->buildPushDownExecutor(pushed_down_filters); LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions()); // record current column defines @@ -145,8 +159,9 @@ PushDownFilterPtr PushDownFilter::build( } } - return std::make_shared( + return std::make_shared( rs_operator, + valid_ann_query_info, before_where, project_after_where, filter_columns, @@ -155,7 +170,7 @@ PushDownFilterPtr PushDownFilter::build( columns_after_cast); } -PushDownFilterPtr PushDownFilter::build( +PushDownExecutorPtr PushDownExecutor::build( const SelectQueryInfo & query_info, const ColumnDefines & columns_to_read, const ColumnDefines & table_column_defines, @@ -174,6 +189,10 @@ PushDownFilterPtr PushDownFilter::build( table_column_defines, context.getSettingsRef().dt_enable_rough_set_filter, tracing_logger); + // build ann_query_info + ANNQueryInfoPtr ann_query_info = nullptr; + if (dag_query->ann_query_info.query_type() != tipb::ANNQueryType::InvalidQueryType) + ann_query_info = std::make_shared(dag_query->ann_query_info); // build push down filter const auto & pushed_down_filters = dag_query->pushed_down_filters; if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty()) @@ -182,16 +201,18 @@ PushDownFilterPtr PushDownFilter::build( pushed_down_filters.begin(), pushed_down_filters.end()}; merged_filters.MergeFrom(dag_query->filters); - return PushDownFilter::build( + return PushDownExecutor::build( rs_operator, + ann_query_info, columns_to_read_info, merged_filters, columns_to_read, context, tracing_logger); } - return PushDownFilter::build( + return PushDownExecutor::build( rs_operator, + ann_query_info, columns_to_read_info, pushed_down_filters, columns_to_read, diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h b/dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.h similarity index 79% rename from dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h rename to dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.h index eb680b6c45e..827c7437e1e 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.h @@ -26,15 +26,16 @@ struct SelectQueryInfo; namespace DB::DM { -class PushDownFilter; -using PushDownFilterPtr = std::shared_ptr; -inline static const PushDownFilterPtr EMPTY_FILTER{}; +class PushDownExecutor; +using PushDownExecutorPtr = std::shared_ptr; +inline static const PushDownExecutorPtr EMPTY_FILTER{}; -class PushDownFilter +class PushDownExecutor { public: - PushDownFilter( + PushDownExecutor( const RSOperatorPtr & rs_operator_, + const ANNQueryInfoPtr & ann_query_info_, const ExpressionActionsPtr & beofre_where_, const ExpressionActionsPtr & project_after_where_, const ColumnDefinesPtr & filter_columns_, @@ -48,15 +49,22 @@ class PushDownFilter , filter_columns(filter_columns_) , extra_cast(extra_cast_) , columns_after_cast(columns_after_cast_) + , ann_query_info(ann_query_info_) {} - explicit PushDownFilter(const RSOperatorPtr & rs_operator_) + explicit PushDownExecutor(const RSOperatorPtr & rs_operator_, const ANNQueryInfoPtr & ann_query_info_ = nullptr) : rs_operator(rs_operator_) + , ann_query_info(ann_query_info_) + {} + + explicit PushDownExecutor(const ANNQueryInfoPtr & ann_query_info_) + : ann_query_info(ann_query_info_) {} // Use by StorageDisaggregated. - static PushDownFilterPtr build( + static PushDownExecutorPtr build( const DM::RSOperatorPtr & rs_operator, + const ANNQueryInfoPtr & ann_query_info, const TiDB::ColumnInfos & table_scan_column_info, const google::protobuf::RepeatedPtrField & pushed_down_filters, const ColumnDefines & columns_to_read, @@ -64,7 +72,7 @@ class PushDownFilter const LoggerPtr & tracing_logger); // Use by StorageDeltaMerge. - static DM::PushDownFilterPtr build( + static DM::PushDownExecutorPtr build( const SelectQueryInfo & query_info, const ColumnDefines & columns_to_read, const ColumnDefines & table_column_defines, @@ -87,6 +95,8 @@ class PushDownFilter const ExpressionActionsPtr extra_cast; // If the extra_cast is not null, the types of the columns may be changed const ColumnDefinesPtr columns_after_cast; + // The ANNQueryInfo contains the information of the ANN index + const ANNQueryInfoPtr ann_query_info; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp index 8970f3aab52..5b839694d23 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp @@ -28,7 +28,6 @@ #include #include #include -#include #include #include @@ -86,36 +85,7 @@ RSOperatorPtr RSOperator::build( if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); - ANNQueryInfoPtr ann_query_info = nullptr; - if (dag_query->ann_query_info.query_type() != tipb::ANNQueryType::InvalidQueryType) - ann_query_info = std::make_shared(dag_query->ann_query_info); - if (!ann_query_info) - return rs_operator; - - bool is_valid_ann_query = ann_query_info->top_k() != std::numeric_limits::max(); - bool is_matching_ann_query = std::any_of( - table_column_defines.begin(), - table_column_defines.end(), - [cid = ann_query_info->column_id()](const ColumnDefine & cd) -> bool { return cd.id == cid; }); - if (!is_valid_ann_query || !is_matching_ann_query) - return rs_operator; - - return wrapWithANNQueryInfo(rs_operator, ann_query_info); -} - -RSOperatorPtr wrapWithANNQueryInfo(const RSOperatorPtr & op, const ANNQueryInfoPtr & ann_query_info) -{ - return std::make_shared(op, ann_query_info); -} - -ANNQueryInfoPtr getANNQueryInfo(const RSOperatorPtr & op) -{ - if (op == nullptr) - return nullptr; - auto with_ann = std::dynamic_pointer_cast(op); - if (with_ann == nullptr) - return nullptr; - return with_ann->ann_query_info; + return rs_operator; } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h index a376965bbf9..3accf33e9b2 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h @@ -164,10 +164,4 @@ RSOperatorPtr createIsNull(const Attr & attr); // RSOperatorPtr createUnsupported(const String & reason); -// Wrap with a ANNQueryInfo -RSOperatorPtr wrapWithANNQueryInfo(const RSOperatorPtr & op, const ANNQueryInfoPtr & ann_query_info); - -// Get ANNQueryInfo from RSOperator -ANNQueryInfoPtr getANNQueryInfo(const RSOperatorPtr & op); - } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Filter/WithANNQueryInfo.h b/dbms/src/Storages/DeltaMerge/Filter/WithANNQueryInfo.h deleted file mode 100644 index df721a93edd..00000000000 --- a/dbms/src/Storages/DeltaMerge/Filter/WithANNQueryInfo.h +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include - -namespace DB::DM -{ - -// TODO(vector-index): find a more elegant way for passing ANNQueryInfo down for -// building `DMFileWithVectorIndexBlockInputStream` -class WithANNQueryInfo : public RSOperator -{ -public: - const RSOperatorPtr child; - const ANNQueryInfoPtr ann_query_info; - - explicit WithANNQueryInfo(const RSOperatorPtr & child_, const ANNQueryInfoPtr & ann_query_info_) - : child(child_) - , ann_query_info(ann_query_info_) - { - RUNTIME_CHECK(ann_query_info != nullptr); - } - - String name() override { return "ann"; } - - String toDebugString() override - { - if (child) - return child->toDebugString(); - else - return ""; - } - - ColIds getColumnIDs() override - { - if (child) - return child->getColumnIDs(); - else - return {}; - } - - RSResults roughCheck(size_t start_pack, size_t pack_count, const RSCheckParam & param) override - { - if (child) - return child->roughCheck(start_pack, pack_count, param); - else - return RSResults(pack_count, RSResult::Some); - } -}; - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h b/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h index d6af96784f6..69b919bbc6f 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h index b38f598de1a..72c95df70be 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include @@ -42,7 +42,7 @@ class RNWorkerPrepareStreams task->initInputStream( *columns_to_read, start_ts, - push_down_filter, + push_down_executor, read_mode, settings.max_block_size, settings.dt_enable_delta_index_error_fallback); @@ -54,7 +54,7 @@ class RNWorkerPrepareStreams public: const ColumnDefinesPtr columns_to_read; const UInt64 start_ts; - const PushDownFilterPtr push_down_filter; + const PushDownExecutorPtr push_down_executor; const ReadMode read_mode; public: @@ -66,7 +66,7 @@ class RNWorkerPrepareStreams const size_t concurrency; const ColumnDefinesPtr & columns_to_read; const UInt64 start_ts; - const PushDownFilterPtr & push_down_filter; + const PushDownExecutorPtr & push_down_executor; const ReadMode read_mode; }; @@ -83,7 +83,7 @@ class RNWorkerPrepareStreams options.concurrency) , columns_to_read(options.columns_to_read) , start_ts(options.start_ts) - , push_down_filter(options.push_down_filter) + , push_down_executor(options.push_down_executor) , read_mode(options.read_mode) {} diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp index 975dae7db25..48bb9c150a8 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp @@ -61,7 +61,7 @@ RNWorkers::RNWorkers( .concurrency = prepare_streams_concurrency, .columns_to_read = options.columns_to_read, .start_ts = options.start_ts, - .push_down_filter = options.push_down_filter, + .push_down_executor = options.push_down_executor, .read_mode = options.read_mode, }); diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h index 69a44b52d81..28ba8b95dc3 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h @@ -47,7 +47,7 @@ class RNWorkers : private boost::noncopyable const LoggerPtr log; const ColumnDefinesPtr & columns_to_read; const UInt64 start_ts; - const PushDownFilterPtr & push_down_filter; + const PushDownExecutorPtr & push_down_executor; const ReadMode read_mode; }; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 5dcc9604477..e139e1e8bf6 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -936,7 +936,7 @@ BlockInputStreamPtr Segment::getInputStream( const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & executor, UInt64 start_ts, size_t expected_block_size) { @@ -958,7 +958,7 @@ BlockInputStreamPtr Segment::getInputStream( dmfile, /*set_cache_if_miss*/ true, read_ranges, - filter ? filter->rs_operator : EMPTY_RS_OPERATOR, + executor ? executor->rs_operator : EMPTY_RS_OPERATOR, /*read_pack*/ {}); pack_filter_results.push_back(result); } @@ -995,7 +995,7 @@ BlockInputStreamPtr Segment::getInputStream( columns_to_read, segment_snap, read_ranges, - filter, + executor, pack_filter_results, start_ts, expected_block_size, @@ -3149,7 +3149,6 @@ std::pair, std::vector> parseDMFilePackInfo( UInt32 preceded_rows = 0; auto file_provider = dm_context.global_context.getFileProvider(); - for (size_t i = 0; i < dmfiles.size(); ++i) { const auto & dmfile = dmfiles[i]; @@ -3220,7 +3219,6 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( }; auto [skipped_ranges, some_packs_sets] = parseDMFilePackInfo(dmfiles, pack_filter_results, start_ts, dm_context); - if (skipped_ranges.size() == 1 && skipped_ranges[0].offset == 0 && skipped_ranges[0].rows == segment_snap->stable->getDMFilesRows()) { @@ -3301,12 +3299,66 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( } SkippableBlockInputStreamPtr Segment::getConcatSkippableBlockInputStream( + const SegmentSnapshotPtr & segment_snap, + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const RowKeyRanges & read_ranges, + const DMFilePackFilterResults & pack_filter_results, + UInt64 start_ts, + size_t expected_block_size, + ReadTag read_tag) +{ + static constexpr bool NeedRowID = false; + // set `is_fast_scan` to true to try to enable clean read + auto enable_handle_clean_read = !hasColumn(columns_to_read, EXTRA_HANDLE_COLUMN_ID); + constexpr auto is_fast_scan = true; + auto enable_del_clean_read = !hasColumn(columns_to_read, TAG_COLUMN_ID); + + SkippableBlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( + dm_context, + columns_to_read, + read_ranges, + start_ts, + expected_block_size, + enable_handle_clean_read, + read_tag, + pack_filter_results, + is_fast_scan, + enable_del_clean_read, + /* read_packs */ {}, + NeedRowID); + + auto columns_to_read_ptr = std::make_shared(columns_to_read); + + auto memtable = segment_snap->delta->getMemTableSetSnapshot(); + auto persisted_files = segment_snap->delta->getPersistedFileSetSnapshot(); + SkippableBlockInputStreamPtr mem_table_stream = std::make_shared( + dm_context, + memtable, + columns_to_read_ptr, + this->rowkey_range, + read_tag); + SkippableBlockInputStreamPtr persisted_files_stream = std::make_shared( + dm_context, + persisted_files, + columns_to_read_ptr, + this->rowkey_range, + read_tag); + + auto stream = std::dynamic_pointer_cast>(stable_stream); + assert(stream != nullptr); + stream->appendChild(persisted_files_stream, persisted_files->getRows()); + stream->appendChild(mem_table_stream, memtable->getRows()); + return stream; +} + +std::tuple Segment::getConcatVectorIndexBlockInputStream( BitmapFilterPtr bitmap_filter, const SegmentSnapshotPtr & segment_snap, const DMContext & dm_context, const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, - const RSOperatorPtr & filter, + const ANNQueryInfoPtr & ann_query_info, const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts, size_t expected_block_size, @@ -3318,7 +3370,6 @@ SkippableBlockInputStreamPtr Segment::getConcatSkippableBlockInputStream( constexpr auto is_fast_scan = true; auto enable_del_clean_read = !hasColumn(columns_to_read, TAG_COLUMN_ID); - auto ann_query_info = getANNQueryInfo(filter); SkippableBlockInputStreamPtr stable_stream = segment_snap->stable->tryGetInputStreamWithVectorIndex( dm_context, columns_to_read, @@ -3369,19 +3420,17 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & data_ranges, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & executor, const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts, size_t expected_block_size) { - const auto & filter_columns = filter->filter_columns; + const auto & filter_columns = executor->filter_columns; BlockInputStreamPtr filter_column_stream = getConcatSkippableBlockInputStream( - bitmap_filter, segment_snap, dm_context, *filter_columns, data_ranges, - filter->rs_operator, pack_filter_results, start_ts, expected_block_size, @@ -3396,29 +3445,31 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( filter_columns->size()); BlockInputStreamPtr stream = std::make_shared(*filter_columns, filter_column_stream, bitmap_filter); - if (filter->extra_cast) + if (executor->extra_cast) { - stream = std::make_shared(stream, filter->extra_cast, dm_context.tracing_id); + stream = std::make_shared(stream, executor->extra_cast, dm_context.tracing_id); stream->setExtraInfo("cast after tableScan"); } stream = std::make_shared( stream, - filter->before_where, - filter->filter_column_name, + executor->before_where, + executor->filter_column_name, dm_context.tracing_id); stream->setExtraInfo("push down filter"); - stream - = std::make_shared(stream, filter->project_after_where, dm_context.tracing_id); + stream = std::make_shared( + stream, + executor->project_after_where, + dm_context.tracing_id); stream->setExtraInfo("project after where"); return stream; } // construct extra cast stream if needed - if (filter->extra_cast) + if (executor->extra_cast) { filter_column_stream = std::make_shared( filter_column_stream, - filter->extra_cast, + executor->extra_cast, dm_context.tracing_id); filter_column_stream->setExtraInfo("cast after tableScan"); } @@ -3426,8 +3477,8 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( // construct filter stream filter_column_stream = std::make_shared( filter_column_stream, - filter->before_where, - filter->filter_column_name, + executor->before_where, + executor->filter_column_name, dm_context.tracing_id); filter_column_stream->setExtraInfo("push down filter"); @@ -3445,12 +3496,10 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( // construct stream for the rest columns auto rest_column_stream = getConcatSkippableBlockInputStream( - bitmap_filter, segment_snap, dm_context, *rest_columns_to_read, data_ranges, - filter->rs_operator, pack_filter_results, start_ts, expected_block_size, @@ -3459,7 +3508,7 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( // construct late materialization stream return std::make_shared( columns_to_read, - filter->filter_column_name, + executor->filter_column_name, filter_column_stream, rest_column_stream, bitmap_filter, @@ -3488,7 +3537,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & executor, const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts, size_t build_bitmap_filter_block_rows, @@ -3514,7 +3563,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( segment_snap->stable->clearColumnCaches(); } - if (filter && filter->before_where) + if (executor && executor->before_where) { // if has filter conditions pushed down, use late materialization return getLateMaterializationStream( @@ -3523,34 +3572,51 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( columns_to_read, segment_snap, real_ranges, - filter, + executor, pack_filter_results, start_ts, read_data_block_rows); } - auto stream = getConcatSkippableBlockInputStream( - bitmap_filter, - segment_snap, - dm_context, - columns_to_read, - real_ranges, - filter ? filter->rs_operator : EMPTY_RS_OPERATOR, - pack_filter_results, - start_ts, - read_data_block_rows, - ReadTag::Query); - if (auto * vector_index_stream = dynamic_cast(stream.get()); - vector_index_stream) + SkippableBlockInputStreamPtr stream; + if (executor && executor->ann_query_info) { - // For vector search, there are more likely to return small blocks from different - // sub-streams. Squash blocks to reduce the number of blocks thus improve the - // performance of upper layer. - return std::make_shared( - stream, - /*min_block_size_rows=*/read_data_block_rows, - /*min_block_size_bytes=*/0, - dm_context.tracing_id); + // For ANN query, try to use vector index to accelerate. + bool is_vector = false; + std::tie(stream, is_vector) = getConcatVectorIndexBlockInputStream( + bitmap_filter, + segment_snap, + dm_context, + columns_to_read, + real_ranges, + executor->ann_query_info, + pack_filter_results, + start_ts, + read_data_block_rows, + ReadTag::Query); + if (is_vector) + { + // For vector search, there are more likely to return small blocks from different + // sub-streams. Squash blocks to reduce the number of blocks thus improve the + // performance of upper layer. + return std::make_shared( + stream, + /*min_block_size_rows=*/read_data_block_rows, + /*min_block_size_bytes=*/0, + dm_context.tracing_id); + } + } + else + { + stream = getConcatSkippableBlockInputStream( + segment_snap, + dm_context, + columns_to_read, + real_ranges, + pack_filter_results, + start_ts, + read_data_block_rows, + ReadTag::Query); } return std::make_shared(columns_to_read, stream, bitmap_filter); } diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index b8ba5b3400f..bb331e8cdef 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -41,8 +41,8 @@ class StableValueSpace; using StableValueSpacePtr = std::shared_ptr; class DeltaValueSpace; using DeltaValueSpacePtr = std::shared_ptr; -class PushDownFilter; -using PushDownFilterPtr = std::shared_ptr; +class PushDownExecutor; +using PushDownExecutorPtr = std::shared_ptr; enum class ReadMode; @@ -230,7 +230,7 @@ class Segment const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & executor, UInt64 start_ts, size_t expected_block_size); @@ -751,13 +751,23 @@ class Segment const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts, size_t expected_block_size); - SkippableBlockInputStreamPtr getConcatSkippableBlockInputStream( + // Returns + std::tuple getConcatVectorIndexBlockInputStream( BitmapFilterPtr bitmap_filter, const SegmentSnapshotPtr & segment_snap, const DMContext & dm_context, const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, - const RSOperatorPtr & filter, + const ANNQueryInfoPtr & ann_query_info, + const DMFilePackFilterResults & pack_filter_results, + UInt64 start_ts, + size_t expected_block_size, + ReadTag read_tag); + SkippableBlockInputStreamPtr getConcatSkippableBlockInputStream( + const SegmentSnapshotPtr & segment_snap, + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const RowKeyRanges & read_ranges, const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts, size_t expected_block_size, @@ -767,7 +777,7 @@ class Segment const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & executor, const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts, size_t build_bitmap_filter_block_rows, @@ -779,7 +789,7 @@ class Segment const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & data_ranges, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & executor, const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts, size_t expected_block_size); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index 28822cc26af..afe4a2b076e 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -260,7 +260,7 @@ void SegmentReadTask::initColumnFileDataProvider(const Remote::RNLocalPageCacheG void SegmentReadTask::initInputStream( const ColumnDefines & columns_to_read, UInt64 start_ts, - const PushDownFilterPtr & push_down_filter, + const PushDownExecutorPtr & push_down_executor, ReadMode read_mode, size_t expected_block_size, bool enable_delta_index_error_fallback) @@ -268,7 +268,7 @@ void SegmentReadTask::initInputStream( if (likely(doInitInputStreamWithErrorFallback( columns_to_read, start_ts, - push_down_filter, + push_down_executor, read_mode, expected_block_size, enable_delta_index_error_fallback))) @@ -283,20 +283,20 @@ void SegmentReadTask::initInputStream( { cache->setDeltaIndex(read_snapshot->delta->getSharedDeltaIndex()); } - doInitInputStream(columns_to_read, start_ts, push_down_filter, read_mode, expected_block_size); + doInitInputStream(columns_to_read, start_ts, push_down_executor, read_mode, expected_block_size); } bool SegmentReadTask::doInitInputStreamWithErrorFallback( const ColumnDefines & columns_to_read, UInt64 start_ts, - const PushDownFilterPtr & push_down_filter, + const PushDownExecutorPtr & push_down_executor, ReadMode read_mode, size_t expected_block_size, bool enable_delta_index_error_fallback) { try { - doInitInputStream(columns_to_read, start_ts, push_down_filter, read_mode, expected_block_size); + doInitInputStream(columns_to_read, start_ts, push_down_executor, read_mode, expected_block_size); return true; } catch (const Exception & e) @@ -316,7 +316,7 @@ bool SegmentReadTask::doInitInputStreamWithErrorFallback( void SegmentReadTask::doInitInputStream( const ColumnDefines & columns_to_read, UInt64 start_ts, - const PushDownFilterPtr & push_down_filter, + const PushDownExecutorPtr & push_down_executor, ReadMode read_mode, size_t expected_block_size) { @@ -333,7 +333,7 @@ void SegmentReadTask::doInitInputStream( columns_to_read, read_snapshot, ranges, - push_down_filter, + push_down_executor, start_ts, expected_block_size); } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index e7e6f1d9ab6..af4d3cc7268 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -103,7 +103,7 @@ struct SegmentReadTask void initInputStream( const ColumnDefines & columns_to_read, UInt64 start_ts, - const PushDownFilterPtr & push_down_filter, + const PushDownExecutorPtr & push_down_executor, ReadMode read_mode, size_t expected_block_size, bool enable_delta_index_error_fallback); @@ -140,7 +140,7 @@ struct SegmentReadTask bool doInitInputStreamWithErrorFallback( const ColumnDefines & columns_to_read, UInt64 start_ts, - const PushDownFilterPtr & push_down_filter, + const PushDownExecutorPtr & push_down_executor, ReadMode read_mode, size_t expected_block_size, bool enable_delta_index_error_fallback); @@ -148,7 +148,7 @@ struct SegmentReadTask void doInitInputStream( const ColumnDefines & columns_to_read, UInt64 start_ts, - const PushDownFilterPtr & push_down_filter, + const PushDownExecutorPtr & push_down_executor, ReadMode read_mode, size_t expected_block_size); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index b5506f45c42..eb00d24ec5e 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -117,7 +117,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t SegmentReadTaskPool::SegmentReadTaskPool( int extra_table_id_index_, const ColumnDefines & columns_to_read_, - const PushDownFilterPtr & filter_, + const PushDownExecutorPtr & filter_, uint64_t start_ts_, size_t expected_block_size_, ReadMode read_mode_, diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 8432342fc88..76dda96f18d 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include #include @@ -111,7 +111,7 @@ class SegmentReadTaskPool SegmentReadTaskPool( int extra_table_id_index_, const ColumnDefines & columns_to_read_, - const PushDownFilterPtr & filter_, + const PushDownExecutorPtr & filter_, uint64_t start_ts_, size_t expected_block_size_, ReadMode read_mode_, @@ -214,7 +214,7 @@ class SegmentReadTaskPool const int extra_table_id_index; ColumnDefines columns_to_read; - PushDownFilterPtr filter; + PushDownExecutorPtr filter; const uint64_t start_ts; const size_t expected_block_size; const ReadMode read_mode; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 8a4b363a269..6cc96bdc63d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -65,7 +65,7 @@ extern const char proactive_flush_force_set_type[]; namespace DB::tests { -DM::PushDownFilterPtr generatePushDownFilter( +DM::PushDownExecutorPtr generatePushDownExecutor( Context & ctx, const String & table_info_json, const String & query, @@ -582,7 +582,7 @@ try {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), - std::make_shared(filter), + std::make_shared(filter), std::vector{}, 0, TRACING_NAME, @@ -4108,7 +4108,7 @@ try return block; }; - auto check = [&](PushDownFilterPtr filter, RSResult expected_res, const std::vector & expected_data) { + auto check = [&](PushDownExecutorPtr filter, RSResult expected_res, const std::vector & expected_data) { auto in = store->read( *db_context, db_context->getSettingsRef(), @@ -4153,7 +4153,7 @@ try })json"; auto create_filter = [&](Int64 value) { - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( *db_context, table_info_json, fmt::format("select * from default.t_111 where col_time >= {}", value)); @@ -4237,7 +4237,7 @@ try return block; }; - auto check = [&](PushDownFilterPtr filter, RSResult expected_res, const std::vector & expected_data) { + auto check = [&](PushDownExecutorPtr filter, RSResult expected_res, const std::vector & expected_data) { auto in = store->read( *db_context, db_context->getSettingsRef(), @@ -4283,7 +4283,7 @@ try })json"; auto create_filter = [&](Int64 value) { - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( *db_context, table_info_json, fmt::format("select * from default.t_111 where col_time >= {}", value)); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp index 69f123e035b..83fe42b5038 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp @@ -98,7 +98,7 @@ class DeltaMergeStoreVectorTest store->write(*db_context, db_context->getSettingsRef(), block); } - void read(const RowKeyRange & range, const PushDownFilterPtr & filter, const ColumnWithTypeAndName & out) + void read(const RowKeyRange & range, const PushDownExecutorPtr & filter, const ColumnWithTypeAndName & out) { auto in = store->read( *db_context, @@ -253,7 +253,7 @@ try ann_query_info->set_top_k(2); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({127.5})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{127.0}, {128.0}})); } @@ -262,7 +262,7 @@ try ann_query_info->set_top_k(2); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{72.0}, {73.0}})); } } @@ -303,7 +303,7 @@ try ann_query_info->set_top_k(2); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{72.0}, {73.0}})); } @@ -312,7 +312,7 @@ try ann_query_info->set_top_k(2); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({127.5})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{127.0}, {128.0}})); } } @@ -360,7 +360,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); // [0, 128) with vector index return 72.0, [128, 130) without vector index return all. read(range, filter, createVecFloat32Column({{72.0}, {128.0}, {129.0}})); } @@ -370,7 +370,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({72.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); // [0, 128) with vector index return 72.0, [128, 130) without vector index return all. read(range, filter, createVecFloat32Column({{72.0}, {128.0}, {129.0}})); } @@ -419,7 +419,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); // [0, 4) without vector index return all. read(range, filter, createVecFloat32Column({{0.0}, {1.0}, {2.0}, {3.0}})); } @@ -429,7 +429,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({1.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); // [0, 4) without vector index return all. read(range, filter, createVecFloat32Column({{0.0}, {1.0}, {2.0}, {3.0}})); } @@ -502,7 +502,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(left_segment_range, filter, createVecFloat32Column({{2.0}})); } @@ -512,7 +512,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({222.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(left_segment_range, filter, createVecFloat32Column({{127.0}})); } @@ -535,7 +535,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{2.0}})); } @@ -545,7 +545,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({122.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{122.0}})); } @@ -630,7 +630,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(left_segment_range, filter, createVecFloat32Column({{2.0}})); } @@ -640,7 +640,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({222.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(left_segment_range, filter, createVecFloat32Column({{127.0}})); } @@ -663,7 +663,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{2.0}})); } @@ -673,7 +673,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({122.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{122.0}})); } @@ -753,7 +753,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{2.0}})); } @@ -763,7 +763,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{2.0}})); } @@ -841,7 +841,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{2.0}})); } @@ -851,7 +851,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({222.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{222.0}})); } @@ -928,7 +928,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{2.0}})); } @@ -938,7 +938,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(encodeVectorFloat32({222.1})); - auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); read(range, filter, createVecFloat32Column({{222.0}})); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index 05b18dc9e2e..5fb7debed72 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -159,7 +159,7 @@ bool checkMatch( {all_range}, 1, std::numeric_limits::max(), - std::make_shared(filter), + std::make_shared(filter), std::vector{}, 0, name, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index c1b17e94f07..0391ff1100b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -1217,7 +1217,7 @@ class VectorIndexSegmentTestBase columns_to_read, snapshot, {range}, - std::make_shared(wrapWithANNQueryInfo({}, ann_query)), + std::make_shared(ann_query), pack_filter_results, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, @@ -1798,7 +1798,7 @@ class VectorIndexSegmentOnS3Test BlockInputStreamPtr createComputeNodeStream( const SegmentPtr & write_node_segment, const ColumnDefines & columns_to_read, - const PushDownFilterPtr & filter, + const PushDownExecutorPtr & filter, const ScanContextPtr & read_scan_context = nullptr) { auto write_dm_context = dmContext(); @@ -1951,7 +1951,7 @@ class VectorIndexSegmentOnS3Test auto stream = createComputeNodeStream( wn_segment, {cdPK(), cdVec()}, - std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)), + std::make_shared(ann_query_info), read_scan_context); return stream; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h index b3516e1de44..9fb34e43031 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h @@ -149,7 +149,7 @@ class DeltaMergeStoreVectorBase : public VectorIndexTestUtils store->write(*db_context, db_context->getSettingsRef(), block); } - void read(const RowKeyRange & range, const PushDownFilterPtr & filter, const ColumnWithTypeAndName & out) + void read(const RowKeyRange & range, const PushDownExecutorPtr & filter, const ColumnWithTypeAndName & out) { auto in = store->read( *db_context, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp index 01e38b58c1e..e4ebbf2f6a0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp @@ -99,12 +99,10 @@ class SkippableBlockInputStreamTest : public SegmentTestBasic const RowKeyRanges & read_ranges) { return segment->getConcatSkippableBlockInputStream( - nullptr, snapshot, *dm_context, columns_to_read, read_ranges, - EMPTY_RS_OPERATOR, {}, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 566d6bc3e43..7ecef9432b5 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include #include @@ -179,7 +179,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase protected: UInt64 upload_sequence = 1000; - UInt64 table_id; + UInt64 table_id{}; private: ContextPtr context; @@ -596,7 +596,14 @@ try return genFastAddPeerResFail(FastAddPeerStatus::NoSuitable); }); // Will generate and persist some information in local ps, which will not be uploaded. - FastAddPeerImplWrite(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333, std::move(mock_data), 0); + auto mock_data_cp = mock_data; + FastAddPeerImplWrite( + global_context.getTMTContext(), + proxy_helper.get(), + region_id, + 2333, + std::move(mock_data_cp), + 0); dumpCheckpoint(); FastAddPeerImplWrite(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333, std::move(mock_data), 0); exe_lock.unlock(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index df7ceb171a5..685d57071b6 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -41,7 +41,7 @@ #include #include #include -#include +#include #include #include #include @@ -834,7 +834,8 @@ BlockInputStreams StorageDeltaMerge::read( query_info.req_id, tracing_logger); - auto filter = PushDownFilter::build(query_info, columns_to_read, store->getTableColumns(), context, tracing_logger); + auto filter + = PushDownExecutor::build(query_info, columns_to_read, store->getTableColumns(), context, tracing_logger); auto runtime_filter_list = parseRuntimeFilterList(query_info, store->getTableColumns(), context, tracing_logger); @@ -917,7 +918,8 @@ void StorageDeltaMerge::read( query_info.req_id, tracing_logger); - auto filter = PushDownFilter::build(query_info, columns_to_read, store->getTableColumns(), context, tracing_logger); + auto filter + = PushDownExecutor::build(query_info, columns_to_read, store->getTableColumns(), context, tracing_logger); auto runtime_filter_list = parseRuntimeFilterList(query_info, store->getTableColumns(), context, tracing_logger); diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index f040e76f814..de4340459e4 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 475074d6978..a2e7160ec5f 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -34,7 +34,7 @@ #include #include #include -#include +#include #include #include #include @@ -514,9 +514,16 @@ std::variant StorageDisagg { const auto & executor_id = table_scan.getTableScanExecutorID(); + // build the rough set operator auto rs_operator = buildRSOperator(db_context, column_defines); - auto push_down_filter = DM::PushDownFilter::build( + // build ANN query info + DM::ANNQueryInfoPtr ann_query_info = nullptr; + if (table_scan.getANNQueryInfo().query_type() != tipb::ANNQueryType::InvalidQueryType) + ann_query_info = std::make_shared(table_scan.getANNQueryInfo()); + // build push down executor + auto push_down_executor = DM::PushDownExecutor::build( rs_operator, + ann_query_info, table_scan.getColumns(), table_scan.getPushedDownFilters(), *column_defines, @@ -526,7 +533,7 @@ std::variant StorageDisagg db_context, table_scan.isFastScan(), table_scan.keepOrder(), - push_down_filter); + push_down_executor); const UInt64 start_ts = sender_target_mpp_task_id.gather_id.query_id.start_ts; const auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread; LOG_INFO( @@ -546,7 +553,7 @@ std::variant StorageDisagg return std::make_shared( extra_table_id_index, *column_defines, - push_down_filter, + push_down_executor, start_ts, db_context.getSettingsRef().max_block_size, read_mode, @@ -566,7 +573,7 @@ std::variant StorageDisagg .log = log->getChild(executor_id), .columns_to_read = column_defines, .start_ts = start_ts, - .push_down_filter = push_down_filter, + .push_down_executor = push_down_executor, .read_mode = read_mode, }, num_streams); diff --git a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp index b0808bab5f2..748542d3e66 100644 --- a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp +++ b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include #include @@ -29,13 +29,14 @@ #include #include +#include #include namespace DB::tests { -class ParsePushDownFilterTest : public ::testing::Test +class ParsePushDownExecutorTest : public ::testing::Test { public: static void SetUpTestCase() @@ -54,13 +55,13 @@ class ParsePushDownFilterTest : public ::testing::Test LoggerPtr log = Logger::get(); ContextPtr ctx = DB::tests::TiFlashTestEnv::getContext(); TimezoneInfo default_timezone_info = DB::tests::TiFlashTestEnv::getContext()->getTimezoneInfo(); - DM::PushDownFilterPtr generatePushDownFilter( + DM::PushDownExecutorPtr generatePushDownExecutor( const String & table_info_json, const String & query, TimezoneInfo & timezone_info); }; -DM::PushDownFilterPtr generatePushDownFilter( +DM::PushDownExecutorPtr generatePushDownExecutor( Context & ctx, const String & table_info_json, const String & query, @@ -124,21 +125,27 @@ DM::PushDownFilterPtr generatePushDownFilter( auto rs_operator = DM::FilterParser::parseDAGQuery(*dag_query, table_info.columns, std::move(create_attr_by_column_id), log); - auto push_down_filter - = DM::PushDownFilter::build(rs_operator, table_info.columns, pushed_down_filters, columns_to_read, ctx, log); - return push_down_filter; + auto push_down_executor = DM::PushDownExecutor::build( + rs_operator, + std::make_shared(dag_query->ann_query_info), + table_info.columns, + pushed_down_filters, + columns_to_read, + ctx, + log); + return push_down_executor; } -DM::PushDownFilterPtr ParsePushDownFilterTest::generatePushDownFilter( +DM::PushDownExecutorPtr ParsePushDownExecutorTest::generatePushDownExecutor( const String & table_info_json, const String & query, TimezoneInfo & timezone_info) { - return ::DB::tests::generatePushDownFilter(*ctx, table_info_json, query, timezone_info); + return ::DB::tests::generatePushDownExecutor(*ctx, table_info_json, query, timezone_info); } // Test cases for col and literal -TEST_F(ParsePushDownFilterTest, ColAndLiteral) +TEST_F(ParsePushDownExecutorTest, ColAndLiteral) try { const String table_info_json = R"json({ @@ -152,7 +159,7 @@ try { // Equal between col and literal - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 = 666", default_timezone_info); @@ -174,7 +181,7 @@ try { // Greater between col and literal - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 > 666", default_timezone_info); @@ -196,7 +203,7 @@ try { // GreaterEqual between col and literal - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 >= 667", default_timezone_info); @@ -218,7 +225,7 @@ try { // Less between col and literal - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 < 777", default_timezone_info); @@ -240,7 +247,7 @@ try { // LessEqual between col and literal - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 <= 776", default_timezone_info); @@ -262,7 +269,7 @@ try } CATCH -TEST_F(ParsePushDownFilterTest, LiteralAndCol) +TEST_F(ParsePushDownExecutorTest, LiteralAndCol) try { const String table_info_json = R"json({ @@ -276,7 +283,7 @@ try // Test cases for literal and col (inverse direction) { // Equal between literal and col (take care of direction) - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where 667 = col_2", default_timezone_info); @@ -298,7 +305,7 @@ try { // NotEqual between literal and col (take care of direction) - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where 667 != col_2", default_timezone_info); @@ -320,7 +327,7 @@ try { // Greater between literal and col (take care of direction) - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where 667 < col_2", default_timezone_info); @@ -342,7 +349,7 @@ try { // GreaterEqual between literal and col (take care of direction) - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where 667 <= col_2", default_timezone_info); @@ -364,7 +371,7 @@ try { // Less between literal and col (take care of direction) - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where 777 > col_2", default_timezone_info); @@ -386,7 +393,7 @@ try { // LessEqual between literal and col (take care of direction) - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where 777 >= col_2", default_timezone_info); @@ -409,7 +416,7 @@ try CATCH // Test cases for Logic operator -TEST_F(ParsePushDownFilterTest, LogicOperator) +TEST_F(ParsePushDownExecutorTest, LogicOperator) try { const String table_info_json = R"json({ @@ -424,7 +431,7 @@ try })json"; { // Not - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select col_1, col_2 from default.t_111 where NOT col_2=666", default_timezone_info); @@ -451,7 +458,7 @@ try { // And - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_1 = 'test1' and col_2 = 666", default_timezone_info); @@ -478,7 +485,7 @@ try { // OR - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 = 789 or col_2 = 777", default_timezone_info); @@ -508,7 +515,7 @@ try // More complicated { // And with "not supported" - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_1 = 'test1' and not col_2 = 666", default_timezone_info); @@ -535,7 +542,7 @@ try { // And with not - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 = 789 and not col_3 = 666", default_timezone_info); @@ -564,7 +571,7 @@ try { // And with or - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 = 789 and (col_3 = 666 or col_3 = 678)", default_timezone_info); @@ -595,7 +602,7 @@ try { // Or with "not supported" - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_1 = 'test1' or col_2 = 666", default_timezone_info); @@ -622,7 +629,7 @@ try { // Or with not - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_1 = 'test1' or not col_2 = 666", default_timezone_info); @@ -649,7 +656,7 @@ try { // And between col and literal (not supported since And only support when child is ColumnExpr) - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 and 1", default_timezone_info); @@ -674,7 +681,7 @@ try { // Or between col and literal (not supported since Or only support when child is ColumnExpr) - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, "select * from default.t_111 where col_2 or 1", default_timezone_info); @@ -702,7 +709,7 @@ try CATCH // Test cases for date,datetime,timestamp column -TEST_F(ParsePushDownFilterTest, TimestampColumn) +TEST_F(ParsePushDownExecutorTest, TimestampColumn) try { const String table_info_json = R"json({ @@ -732,7 +739,7 @@ try convertTimeZone(origin_time_stamp, converted_time, *timezone_info.timezone, time_zone_utc); // converted_time: 0 - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, String("select * from default.t_111 where col_timestamp > cast_string_datetime('") + datetime + String("')"), @@ -781,7 +788,7 @@ try convertTimeZone(origin_time_stamp, converted_time, *timezone_info.timezone, time_zone_utc); // converted_time: 1802216518491045888 - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, String("select * from default.t_111 where col_timestamp > cast_string_datetime('") + datetime + String("')"), @@ -837,7 +844,7 @@ try convertTimeZoneByOffset(origin_time_stamp, converted_time, false, timezone_info.timezone_offset); // converted_time: 0 - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, String("select * from default.t_111 where col_timestamp > cast_string_datetime('") + datetime + String("')"), @@ -887,7 +894,7 @@ try { // Greater between Datetime col and Datetime literal - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, String("select * from default.t_111 where col_datetime > cast_string_datetime('") + datetime + String("')"), default_timezone_info); @@ -935,7 +942,7 @@ try { // Greater between Date col and Datetime literal - auto filter = generatePushDownFilter( + auto filter = generatePushDownExecutor( table_info_json, String("select * from default.t_111 where col_date > cast_string_datetime('") + datetime + String("')"), default_timezone_info); diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index 03c1473c347..43426e9ce5e 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -862,7 +862,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(dmsv.encodeVectorFloat32({1.0, 2.0, 3.5})); - auto filter = std::make_shared(DM::wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); dmsv.read(range, filter, createVecFloat32Column({{1.0, 2.0, 3.5}})); } @@ -873,7 +873,7 @@ try ann_query_info->set_top_k(1); ann_query_info->set_ref_vec_f32(dmsv.encodeVectorFloat32({1.0, 2.0, 3.8})); - auto filter = std::make_shared(DM::wrapWithANNQueryInfo(nullptr, ann_query_info)); + auto filter = std::make_shared(ann_query_info); dmsv.read(range, filter, createVecFloat32Column({{1.0, 2.0, 3.5}})); }