Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Filter row group using runtime filter before prepare next row group in FileReader #54868

Merged
merged 6 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

#include "block_cache/block_cache_hit_rate_counter.hpp"
#include "column/column_helper.h"
#include "connector/deletion_vector//deletion_vector.h"
#include "connector/deletion_vector/deletion_vector.h"
#include "exec/exec_node.h"
#include "fs/hdfs/fs_hdfs.h"
#include "io/cache_select_input_stream.hpp"
#include "io/compressed_input_stream.h"
#include "io/shared_buffered_input_stream.h"
#include "pipeline/fragment_context.h"
#include "storage/olap_runtime_range_pruner.hpp"
#include "storage/predicate_parser.h"
#include "util/compression/compression_utils.h"
#include "util/compression/stream_compression.h"
Expand Down Expand Up @@ -184,9 +185,11 @@ Status HdfsScanner::_build_scanner_context() {
opts.pred_tree_params = _runtime_state->fragment_ctx()->pred_tree_params();
ctx.conjuncts_manager = std::make_unique<ScanConjunctsManager>(std::move(opts));
RETURN_IF_ERROR(ctx.conjuncts_manager->parse_conjuncts());
ConnectorPredicateParser predicate_parser{&ctx.slot_descs};
auto* predicate_parser = opts.obj_pool->add(new ConnectorPredicateParser(&ctx.slot_descs));
ASSIGN_OR_RETURN(ctx.predicate_tree,
ctx.conjuncts_manager->get_predicate_tree(&predicate_parser, ctx.predicate_free_pool));
ctx.conjuncts_manager->get_predicate_tree(predicate_parser, ctx.predicate_free_pool));
ctx.rf_scan_range_pruner = opts.obj_pool->add(
new OlapRuntimeScanRangePruner(predicate_parser, ctx.conjuncts_manager->unarrived_runtime_filters()));
}
return Status::OK();
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ struct HdfsScannerContext {

int64_t connector_max_split_size = 0;

OlapRuntimeScanRangePruner* rf_scan_range_pruner = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we definitely have to remove "Olap"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, i will remove it next pr.


// update none_existed_slot
// update conjunct
void update_with_none_existed_slot(SlotDescriptor* slot);
Expand Down
66 changes: 51 additions & 15 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,18 @@

#include <glog/logging.h>

#include <algorithm>
#include <atomic>
#include <cstring>
#include <iterator>
#include <map>
#include <sstream>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "block_cache/kv_cache.h"
#include "column/chunk.h"
#include "column/column.h"
#include "column/column_helper.h"
#include "column/const_column.h"
#include "column/datum.h"
#include "column/vectorized_fwd.h"
#include "common/compiler_util.h"
#include "common/config.h"
Expand All @@ -43,23 +38,19 @@
#include "exprs/expr_context.h"
#include "exprs/runtime_filter.h"
#include "exprs/runtime_filter_bank.h"
#include "formats/parquet/column_converter.h"
#include "formats/parquet/metadata.h"
#include "formats/parquet/scalar_column_reader.h"
#include "formats/parquet/schema.h"
#include "formats/parquet/statistics_helper.h"
#include "formats/parquet/utils.h"
#include "formats/parquet/zone_map_filter_evaluator.h"
#include "fs/fs.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/parquet_types.h"
#include "gutil/casts.h"
#include "gutil/strings/substitute.h"
#include "io/shared_buffered_input_stream.h"
#include "runtime/descriptors.h"
#include "runtime/types.h"
#include "storage/chunk_helper.h"
#include "util/thrift_util.h"

namespace starrocks::parquet {

Expand Down Expand Up @@ -358,6 +349,9 @@ bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_read
// status and lead to the query failed.
bool FileReader::_filter_group(const GroupReaderPtr& group_reader) {
if (config::parquet_advance_zonemap_filter) {
if (_scanner_ctx->rf_scan_range_pruner != nullptr) {
_rf_scan_range_pruner = std::make_shared<OlapRuntimeScanRangePruner>(*_scanner_ctx->rf_scan_range_pruner);
}
auto res = _scanner_ctx->predicate_tree.visit(
ZoneMapEvaluator<FilterLevel::ROW_GROUP>{_scanner_ctx->predicate_tree, group_reader.get()});
if (!res.ok()) {
Expand Down Expand Up @@ -386,6 +380,29 @@ bool FileReader::_filter_group(const GroupReaderPtr& group_reader) {
}
}

StatusOr<bool> FileReader::_update_rf_and_filter_group(const GroupReaderPtr& group_reader) {
bool filter = false;
if (config::parquet_advance_zonemap_filter && _rf_scan_range_pruner != nullptr) {
RETURN_IF_ERROR(_rf_scan_range_pruner->update_range_if_arrived(
&EMPTY_GLOBAL_DICTMAPS,
[&filter, &group_reader](auto cid, const PredicateList& predicates) {
PredicateCompoundNode<CompoundNodeType::AND> pred_tree;
for (const auto& pred : predicates) {
pred_tree.add_child(PredicateColumnNode{pred});
}
auto real_tree = PredicateTree::create(std::move(pred_tree));

auto res = real_tree.visit(ZoneMapEvaluator<FilterLevel::ROW_GROUP>{real_tree, group_reader.get()});
if (res.ok() && res->has_value() && res->value().span_size() == 0) {
filter = true;
}
return Status::OK();
},
true, 0));
}
return filter;
}

Status FileReader::_read_has_nulls(const GroupReaderPtr& group_reader, const std::vector<SlotDescriptor*>& slots,
std::vector<bool>* has_nulls) {
const HdfsScannerContext& ctx = *_scanner_ctx;
Expand Down Expand Up @@ -609,12 +626,31 @@ Status FileReader::get_next(ChunkPtr* chunk) {
}
if (status.is_end_of_file()) {
// release previous RowGroupReader
_row_group_readers[_cur_row_group_idx] = nullptr;
_cur_row_group_idx++;
if (_cur_row_group_idx < _row_group_size) {
// prepare new group
RETURN_IF_ERROR(_row_group_readers[_cur_row_group_idx]->prepare());
}
do {
_row_group_readers[_cur_row_group_idx] = nullptr;
_cur_row_group_idx++;
if (_cur_row_group_idx < _row_group_size) {
const auto& cur_row_group = _row_group_readers[_cur_row_group_idx];
auto ret = _update_rf_and_filter_group(cur_row_group);
if (ret.ok() && ret.value()) {
// row group is filtered by runtime filter
_group_reader_param.stats->parquet_filtered_row_groups += 1;
continue;
} else if (ret.status().is_end_of_file()) {
// If rf is always false, will return eof
_group_reader_param.stats->parquet_filtered_row_groups +=
(_row_group_size - _cur_row_group_idx);
_row_group_readers.assign(_row_group_readers.size(), nullptr);
_cur_row_group_idx = _row_group_size;
break;
} else {
// do nothing, ignore the error code
}

RETURN_IF_ERROR(cur_row_group->prepare());
}
break;
} while (true);

return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
An infinite loop may occur when _row_group_size is 0 or _cur_row_group_idx exceeds _row_group_size.

You can modify the code like this:

Status FileReader::get_next(ChunkPtr* chunk) {
    while (_cur_row_group_idx < _row_group_size) { // Add condition check here
        const auto& cur_row_group = _row_group_readers[_cur_row_group_idx];
        
        auto ret = _update_rf_and_filter_group(cur_row_group);
        if (ret.ok() && ret.value()) {
            _group_reader_param.stats->parquet_filtered_row_groups += 1;
            _row_group_readers[_cur_row_group_idx] = nullptr; // Ensure the current reader is nullified after filtering
            _cur_row_group_idx++;
            continue;
        } else if (ret.status().is_end_of_file()) {
            _group_reader_param.stats->parquet_filtered_row_groups += 
                (_row_group_size - _cur_row_group_idx);
            _row_group_readers.assign(_row_group_readers.size(), nullptr);
            _cur_row_group_idx = _row_group_size;
            break;
        } else {
            // do nothing, ignore the error code
        }

        RETURN_IF_ERROR(cur_row_group->prepare());
        break;
    }
    return Status::OK();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not one good suggestion

Expand Down
3 changes: 3 additions & 0 deletions be/src/formats/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "gen_cpp/parquet_types.h"
#include "io/shared_buffered_input_stream.h"
#include "runtime/runtime_state.h"
#include "storage/olap_runtime_range_pruner.hpp"

namespace tparquet {
class ColumnMetaData;
Expand Down Expand Up @@ -97,6 +98,7 @@ class FileReader {

// filter row group by conjuncts
bool _filter_group(const GroupReaderPtr& group_reader);
StatusOr<bool> _update_rf_and_filter_group(const GroupReaderPtr& group_reader);

bool _filter_group_with_min_max_conjuncts(const GroupReaderPtr& group_reader);

Expand Down Expand Up @@ -146,6 +148,7 @@ class FileReader {
GroupReaderParam _group_reader_param;
std::shared_ptr<MetaHelper> _meta_helper = nullptr;
SkipRowsContextPtr _skip_rows_ctx = nullptr;
std::shared_ptr<OlapRuntimeScanRangePruner> _rf_scan_range_pruner;
};

} // namespace starrocks::parquet
6 changes: 3 additions & 3 deletions be/src/storage/olap_runtime_range_pruner.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ class OlapRuntimeScanRangePruner {
}

Status update_range_if_arrived(const ColumnIdToGlobalDictMap* global_dictmaps,
RuntimeFilterArrivedCallBack&& updater, size_t raw_read_rows) {
RuntimeFilterArrivedCallBack&& updater, bool force, size_t raw_read_rows) {
if (_arrived_runtime_filters_masks.empty()) return Status::OK();
return _update(global_dictmaps, std::move(updater), raw_read_rows);
return _update(global_dictmaps, std::move(updater), force, raw_read_rows);
}

private:
Expand All @@ -73,7 +73,7 @@ class OlapRuntimeScanRangePruner {
StatusOr<PredicatesRawPtrs> _get_predicates(const ColumnIdToGlobalDictMap* global_dictmaps, size_t idx,
ObjectPool* pool);

Status _update(const ColumnIdToGlobalDictMap* global_dictmaps, RuntimeFilterArrivedCallBack&& updater,
Status _update(const ColumnIdToGlobalDictMap* global_dictmaps, RuntimeFilterArrivedCallBack&& updater, bool force,
size_t raw_read_rows);

void _init(const UnarrivedRuntimeFilterList& params);
Expand Down
5 changes: 3 additions & 2 deletions be/src/storage/olap_runtime_range_pruner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ struct RuntimeColumnPredicateBuilder {
} // namespace detail

inline Status OlapRuntimeScanRangePruner::_update(const ColumnIdToGlobalDictMap* global_dictmaps,
RuntimeFilterArrivedCallBack&& updater, size_t raw_read_rows) {
RuntimeFilterArrivedCallBack&& updater, bool force,
size_t raw_read_rows) {
if (_arrived_runtime_filters_masks.empty()) {
return Status::OK();
}
Expand All @@ -234,7 +235,7 @@ inline Status OlapRuntimeScanRangePruner::_update(const ColumnIdToGlobalDictMap*
if (auto rf = _unarrived_runtime_filters[i]->runtime_filter(_driver_sequence)) {
size_t rf_version = rf->rf_version();
if (!_arrived_runtime_filters_masks[i] ||
(rf_version > _rf_versions[i] && raw_read_rows - _raw_read_rows > rf_update_threshold)) {
(rf_version > _rf_versions[i] && (force || raw_read_rows - _raw_read_rows > rf_update_threshold))) {
ObjectPool pool;

ASSIGN_OR_RETURN(auto predicates, _get_predicates(global_dictmaps, i, &pool));
Expand Down
6 changes: 3 additions & 3 deletions be/src/storage/predicate_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ uint32_t OlapPredicateParser::column_id(const SlotDescriptor& slot_desc) const {
}

bool ConnectorPredicateParser::can_pushdown(const ColumnPredicate* predicate) const {
return false;
return true;
}

bool ConnectorPredicateParser::can_pushdown(const SlotDescriptor* slot_desc) const {
return false;
return true;
}

bool ConnectorPredicateParser::can_pushdown(const ConstPredicateNodePtr& pred_tree) const {
return false;
return true;
}

ColumnPredicate* ConnectorPredicateParser::parse_thrift_cond(const TCondition& condition) const {
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/rowset/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ Status SegmentIterator::_try_to_update_ranges_by_runtime_filter() {
_opts.stats->runtime_stats_filtered += (prev_size - _scan_range.span_size());
return Status::OK();
},
_opts.stats->raw_rows_read);
false, _opts.stats->raw_rows_read);
}

StatusOr<std::shared_ptr<Segment>> SegmentIterator::_get_dcg_segment(uint32_t ucid) {
Expand Down
20 changes: 6 additions & 14 deletions be/src/testutil/exprs_test_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ class ExprsTestHelper {
string_literal.value = value;
node.__set_string_literal(string_literal);
node.node_type = TExprNodeType::STRING_LITERAL;
} else if constexpr (LType == TYPE_DECIMAL128) {
node.type = Decimal128TTypeDesc;
TDecimalLiteral decimal_literal;
decimal_literal.value = value;
node.__set_decimal_literal(decimal_literal);
node.node_type = TExprNodeType::DECIMAL_LITERAL;
} else {
// not implement
CHECK(false);
Expand All @@ -362,20 +368,6 @@ class ExprsTestHelper {
return node;
}

static TExprNode create_decimal_literal(const std::string& value, TTypeDesc t_type, bool is_nullable) {
TDecimalLiteral decimal_literal;
decimal_literal.value = value;

TExprNode node;
node.node_type = TExprNodeType::DECIMAL_LITERAL;
node.type = t_type;
node.num_children = 0;
node.__set_decimal_literal(decimal_literal);
node.is_nullable = is_nullable;

return node;
}

static TExpr create_slot_expr(TExprNode slot_ref) {
TExpr expr;
expr.nodes.push_back(slot_ref);
Expand Down
Loading
Loading