Skip to content

Commit

Permalink
Scheme correction for indexes usage (ydb-platform#1180)
Browse files Browse the repository at this point in the history
* correct schemeshard usage for index operations

* fix build

* fix build

* correction

* fix build

* fix build

* fix ut build

* fix tests
  • Loading branch information
ivanmorozov333 authored Jan 22, 2024
1 parent 19d7298 commit a7dc805
Show file tree
Hide file tree
Showing 48 changed files with 1,760 additions and 746 deletions.
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/hash/calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace NKikimr::NArrow::NHash {

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) const {
void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
NArrow::SwitchType(array->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using T = typename TWrap::T;
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/formats/arrow/hash/calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ class TXX64 {
ui64 Seed = 0;
const std::vector<TString> ColumnNames;
const ENoColumnPolicy NoColumnPolicy;
void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer) const;

std::vector<std::shared_ptr<arrow::Array>> GetColumns(const std::shared_ptr<arrow::RecordBatch>& batch) const;

public:
TXX64(const std::vector<TString>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0);
TXX64(const std::vector<std::string>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0);

static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;
std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const;


};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "upsert_index.h"
#include <util/string/type.h>
#include <library/cpp/json/json_reader.h>

namespace NKikimr::NKqp {

TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) {
{
auto fValue = features.Extract("NAME");
if (!fValue) {
return TConclusionStatus::Fail("can't find alter parameter NAME");
}
IndexName = *fValue;
}
TString indexType;
{
auto fValue = features.Extract("TYPE");
if (!fValue) {
return TConclusionStatus::Fail("can't find alter parameter TYPE");
}
indexType = *fValue;
}
{
auto fValue = features.Extract("FEATURES");
if (!fValue) {
return TConclusionStatus::Fail("can't find alter parameter FEATURES");
}
if (!IndexMetaConstructor.Initialize(indexType)) {
return TConclusionStatus::Fail("can't initialize index meta object for type \"" + indexType + "\"");
}
NJson::TJsonValue jsonData;
if (!NJson::ReadJsonFastTree(*fValue, &jsonData)) {
return TConclusionStatus::Fail("incorrect json in request FEATURES parameter");
}
auto result = IndexMetaConstructor->DeserializeFromJson(jsonData);
if (result.IsFail()) {
return result;
}
}
return TConclusionStatus::Success();
}

void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* indexProto = schemaData.AddUpsertIndexes();
indexProto->SetName(IndexName);
IndexMetaConstructor.SerializeToProto(*indexProto);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h>

namespace NKikimr::NKqp {

class TUpsertIndexOperation : public ITableStoreOperation {
private:
static TString GetTypeName() {
return "UPSERT_INDEX";
}

static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName());
private:
TString IndexName;
NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;

void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const override;
};

}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SRCS(
GLOBAL add_column.cpp
GLOBAL alter_column.cpp
GLOBAL drop_column.cpp
GLOBAL upsert_index.cpp
)

PEERDIR(
Expand Down
37 changes: 37 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,40 @@ message TOlapColumnDescription {
optional TDictionaryEncodingSettings DictionaryEncoding = 9;
}

message TRequestedBloomFilter {
optional double FalsePositiveProbability = 1 [default = 0.1];
repeated string ColumnNames = 3;
}

message TOlapIndexRequested {
optional string Name = 1;
optional TCompressionOptions Compression = 3;

optional string ClassName = 2;
oneof Implementation {
TRequestedBloomFilter BloomFilter = 40;
}
}

message TBloomFilter {
optional double FalsePositiveProbability = 1 [default = 0.1];
optional uint64 MaxBytesCount = 2 [default = 8196];
repeated uint32 ColumnIds = 3;
}

message TOlapIndexDescription {
// This id is auto-generated by schemeshard
optional uint32 Id = 1;

optional string Name = 2;
optional TCompressionOptions Compression = 3;

optional string ClassName = 4;
oneof Implementation {
TBloomFilter BloomFilter = 40;
}
}

enum EColumnTableEngine {
COLUMN_ENGINE_NONE = 0;
COLUMN_ENGINE_REPLACING_TIMESERIES = 1;
Expand Down Expand Up @@ -456,13 +490,16 @@ message TColumnTableSchema {
optional TCompressionOptions DefaultCompression = 8;

optional bool CompositeMarks = 9 [ default = false ];
repeated TOlapIndexDescription Indexes = 10;
}

message TAlterColumnTableSchema {
repeated TOlapColumnDescription AddColumns = 1;
//optional TCompressionOptions DefaultCompression = 5;
repeated TOlapColumnDescription DropColumns = 6;
repeated TOlapColumnDiff AlterColumns = 7;
repeated TOlapIndexRequested UpsertIndexes = 8;
repeated string DropIndexes = 9;
}

// Schema presets are used to manage multiple tables with the same schema
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/formats/arrow/hash/xx_hash.h>
#include <ydb/core/formats/arrow/hash/calcer.h>

namespace NKikimr::NOlap::NIndexes {

} // namespace NKikimr::NOlap::NIndexes
147 changes: 147 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#pragma once

#include <ydb/core/tx/columnshard/splitter/chunks.h>
#include <ydb/core/tx/program/program.h>

#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <library/cpp/object_factory/object_factory.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <util/generic/string.h>

#include <memory>
#include <vector>

namespace NKikimr::NOlap {
struct TIndexInfo;
}

namespace NKikimr::NSchemeShard {
class TOlapSchema;
class IErrorCollector;
}

namespace NKikimr::NOlap::NIndexes {

class IIndexChecker {
protected:
virtual bool DoCheck(std::vector<TString>&& blobs) const = 0;
public:
virtual ~IIndexChecker() = default;
bool Check(std::vector<TString>&& blobs) const {
return DoCheck(std::move(blobs));
}
};

class TIndexCheckerContainer {
private:
YDB_READONLY(ui32, IndexId, 0);
YDB_READONLY_DEF(std::shared_ptr<IIndexChecker>, Object);
public:
TIndexCheckerContainer(const ui32 indexId, const std::shared_ptr<IIndexChecker>& object)
: IndexId(indexId)
, Object(object) {
AFL_VERIFY(IndexId);
AFL_VERIFY(Object);
}

const IIndexChecker* operator->() const {
return Object.get();
}
};

class IIndexMeta {
protected:
virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const = 0;
virtual std::shared_ptr<IIndexChecker> DoBuildIndexChecker(const TProgramContainer& program) const = 0;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0;

public:
using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
using TProto = NKikimrSchemeOp::TOlapIndexDescription;

virtual ~IIndexMeta() = default;

std::shared_ptr<IPortionDataChunk> BuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
return DoBuildIndex(indexId, data, indexInfo);
}

std::shared_ptr<IIndexChecker> BuildIndexChecker(const TProgramContainer& program) const {
return DoBuildIndexChecker(program);
}

bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
return DoDeserializeFromProto(proto);
}

void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const {
return DoSerializeToProto(proto);
}

virtual TString GetClassName() const = 0;
};

class IIndexMetaConstructor {
protected:
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0;
virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0;
public:
using TFactory = NObjectFactory::TObjectFactory<IIndexMetaConstructor, TString>;
using TProto = NKikimrSchemeOp::TOlapIndexRequested;

virtual ~IIndexMetaConstructor() = default;

TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
return DoDeserializeFromJson(jsonInfo);
}

std::shared_ptr<IIndexMeta> CreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
return DoCreateIndexMeta(currentSchema, errors);
}

TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
return DoDeserializeFromProto(proto);
}

void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
return DoSerializeToProto(proto);
}

virtual TString GetClassName() const = 0;
};

class TIndexMetaContainer: public NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta>;
YDB_READONLY(ui32, IndexId, 0);
public:
TIndexMetaContainer() = default;
TIndexMetaContainer(const ui32 indexId, const std::shared_ptr<IIndexMeta>& object)
: TBase(object)
, IndexId(indexId)
{
AFL_VERIFY(IndexId);
AFL_VERIFY(Object);
}

bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
if (!TBase::DeserializeFromProto(proto)) {
return false;
}
IndexId = proto.GetId();
return true;
}

std::optional<TIndexCheckerContainer> BuildIndexChecker(const TProgramContainer& program) const {
auto checker = GetObjectPtr()->BuildIndexChecker(program);
if (!checker) {
return {};
}
return TIndexCheckerContainer(IndexId, checker);
}
};

} // namespace NKikimr::NOlap::NIndexes
Loading

0 comments on commit a7dc805

Please sign in to comment.