Skip to content

Commit

Permalink
draft impl
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurpassos committed Jan 22, 2025
1 parent 6cd07cd commit 920adbf
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
}
break;
}
case ASTAlterCommand::EXPORT_PART:
{
required_access.emplace_back(AccessType::ALTER_MOVE_PARTITION, database, table);
break;
}
case ASTAlterCommand::REPLACE_PARTITION:
{
required_access.emplace_back(AccessType::SELECT, command.from_database, command.from_table);
Expand Down
6 changes: 6 additions & 0 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
ostr << quoteString(move_destination_name);
}
}
else if (type == ASTAlterCommand::EXPORT_PART)
{
ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << (part ? "PART " : "PARTITION ")
<< (settings.hilite ? hilite_none : "");
partition->formatImpl(ostr, settings, state, frame);
}
else if (type == ASTAlterCommand::REPLACE_PARTITION)
{
ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION "
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST
FREEZE_ALL,
UNFREEZE_PARTITION,
UNFREEZE_ALL,
EXPORT_PART,

DELETE,
UPDATE,
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ namespace DB
MR_MACROS(MONTHS, "MONTHS") \
MR_MACROS(MOVE_PART, "MOVE PART") \
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
26 changes: 26 additions & 0 deletions src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION);
ParserKeyword s_move_partition(Keyword::MOVE_PARTITION);
ParserKeyword s_move_part(Keyword::MOVE_PART);
ParserKeyword s_export_part(Keyword::EXPORT_PART);
ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION);
ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART);
ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION);
Expand Down Expand Up @@ -554,6 +555,31 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->move_destination_name = ast_space_name->as<ASTLiteral &>().value.safeGet<const String &>();
}
}
else if (s_export_part.ignore(pos, expected))
{
if (!parser_string_and_substituion.parse(pos, command_partition, expected))
return false;

command->type = ASTAlterCommand::EXPORT_PART;
command->part = true;

// if (s_to_disk.ignore(pos, expected))
// command->move_destination_type = DataDestinationType::DISK;
// else if (s_to_volume.ignore(pos, expected))
// command->move_destination_type = DataDestinationType::VOLUME;
// else if (s_to_shard.ignore(pos, expected))
// {
// command->move_destination_type = DataDestinationType::SHARD;
// }
// else
// return false;
//
// ASTPtr ast_space_name;
// if (!parser_string_literal.parse(pos, ast_space_name, expected))
// return false;
//
// command->move_destination_name = ast_space_name->as<ASTLiteral &>().value.safeGet<const String &>();
}
else if (s_add_constraint.ignore(pos, expected))
{
if (s_if_not_exists.ignore(pos, expected))
Expand Down
16 changes: 16 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
#include <Storages/MergeTree/exportMTPartToParquet.h>

#include <boost/range/algorithm_ext/erase.hpp>
#include <boost/algorithm/string/join.hpp>
Expand Down Expand Up @@ -5594,6 +5595,21 @@ Pipe MergeTreeData::alterPartition(
}
break;

case PartitionCommand::EXPORT_PART:
{
if (command.part)
{
auto part_name = command.partition->as<ASTLiteral &>().value.safeGet<String>();
auto data_part = getPartIfExists(part_name, {DataPartStates::value_type::Active});

if (!data_part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in committed state", part_name);

exportMTPartToParquet(*this, data_part, query_context);
}
break;
}

case PartitionCommand::DROP_DETACHED_PARTITION:
dropDetached(command.partition, command.part, query_context);
break;
Expand Down
84 changes: 84 additions & 0 deletions src/Storages/MergeTree/exportMTPartToParquet.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include <Storages/MergeTree/exportMTPartToParquet.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Processors/Formats/Impl/ParquetBlockOutputFormat.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Formats/FormatFactory.h>


namespace DB
{

void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, ContextPtr context)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
StorageSnapshotPtr storage_snapshot = data.getStorageSnapshot(metadata_snapshot, context);

MergeTreeData::IMutationsSnapshot::Params params
{
.metadata_version = metadata_snapshot->getMetadataVersion(),
.min_part_metadata_version = data_part->getMetadataVersion(),
};

auto mutations_snapshot = data.getMutationsSnapshot(params);

auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
data_part,
mutations_snapshot,
metadata_snapshot,
context);

QueryPlan plan;

// todoa arthur
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Merge;

bool apply_deleted_mask = true;
bool read_with_direct_io = false;
bool prefetch = false;

createReadFromPartStep(
read_type,
plan,
data,
storage_snapshot,
data_part,
alter_conversions,
columns_to_read,
nullptr,
apply_deleted_mask,
std::nullopt,
read_with_direct_io,
prefetch,
context,
getLogger("abcde"));

auto pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
auto builder = plan.buildQueryPipeline(optimization_settings, pipeline_settings);

QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
auto header_block = pipeline.getHeader();

auto out_file_name = data_part->name + ".parquet";

auto out_file = std::make_shared<WriteBufferFromFile>(out_file_name);
auto parquet_output = FormatFactory::instance().getOutputFormat("Parquet", *out_file, header_block, context);
PullingPipelineExecutor executor(pipeline);

Block block;
while (executor.pull(block))
{
parquet_output->write(block);
}

parquet_output->finalize();

out_file->finalize();
}

}
10 changes: 10 additions & 0 deletions src/Storages/MergeTree/exportMTPartToParquet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

#include <Storages/MergeTree/MergeTreeData.h>

namespace DB
{

void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, ContextPtr context);

}
8 changes: 8 additions & 0 deletions src/Storages/PartitionCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.part = command_ast->part;
return res;
}
if (command_ast->type == ASTAlterCommand::EXPORT_PART)
{
PartitionCommand res;
res.type = EXPORT_PART;
res.partition = command_ast->partition->clone();
res.part = command_ast->part;
return res;
}
if (command_ast->type == ASTAlterCommand::MOVE_PARTITION)
{
PartitionCommand res;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/PartitionCommands.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct PartitionCommand
UNFREEZE_ALL_PARTITIONS,
UNFREEZE_PARTITION,
REPLACE_PARTITION,
EXPORT_PART,
};

Type type = UNKNOWN;
Expand Down

0 comments on commit 920adbf

Please sign in to comment.