From fbe0882e117abd9868c72b3a0ed37dd61b4f753d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 27 Nov 2024 10:44:20 +0100 Subject: [PATCH 1/8] Test for remote+s3Cluster --- tests/integration/test_s3_cluster/test.py | 56 ++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index e8bf031021e..67c3120a882 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -360,7 +360,7 @@ def test_parallel_distributed_insert_select_with_schema_inference(started_cluste node.query( """ CREATE TABLE parallel_insert_select ON CLUSTER 'first_shard' (a String, b UInt64) - ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}') + ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/parallel_insert_select', '{replica}') ORDER BY (a, b); """ ) @@ -508,3 +508,57 @@ def test_cluster_default_expression(started_cluster): ) assert result == expected_result + + +def test_remote_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + """ + SELECT * from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + """ + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=True + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) + + +def test_remote_no_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + """ + SELECT * from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + """ + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=False + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) From 32cb53d51709e7101e981a686bf1478226c7d7ab Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 10 Dec 2024 14:19:46 +0100 Subject: [PATCH 2/8] Use INITIAL_QUERY for remote() call --- src/Interpreters/ClusterProxy/executeQuery.cpp | 1 + src/Processors/QueryPlan/ReadFromRemote.cpp | 6 +++++- src/Processors/QueryPlan/ReadFromRemote.h | 2 ++ src/QueryPipeline/RemoteQueryExecutor.cpp | 6 +++++- src/QueryPipeline/RemoteQueryExecutor.h | 4 ++++ 5 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 3ce2ff04a74..557bea74b96 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -442,6 +442,7 @@ void executeQuery( not_optimized_cluster->getName()); read_from_remote->setStepDescription("Read from remote replica"); + read_from_remote->setRemoteFunction(is_remote_function); plan->addStep(std::move(read_from_remote)); plan->addInterpreterContext(new_context); plans.emplace_back(std::move(plan)); diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 13cd9a73b13..42f4dc6c834 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -167,7 +167,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream my_main_table = main_table, my_table_func_ptr = table_func_ptr, my_scalars = scalars, my_external_tables = external_tables, my_stage = stage, local_delay = shard.local_delay, - add_agg_info, add_totals, add_extremes, async_read, async_query_sending]() mutable + add_agg_info, add_totals, add_extremes, async_read, async_query_sending, + my_is_remote_function = is_remote_function]() mutable -> QueryPipelineBuilder { auto current_settings = my_context->getSettingsRef(); @@ -221,6 +222,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream {DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; auto remote_query_executor = std::make_shared( std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage); + remote_query_executor->setRemoteFunction(my_is_remote_function); auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending); QueryPipelineBuilder builder; @@ -304,6 +306,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact priority_func); remote_query_executor->setLogger(log); remote_query_executor->setPoolMode(PoolMode::GET_ONE); + remote_query_executor->setRemoteFunction(is_remote_function); if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); @@ -320,6 +323,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact auto remote_query_executor = std::make_shared( shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage); remote_query_executor->setLogger(log); + remote_query_executor->setRemoteFunction(is_remote_function); if (context->canUseTaskBasedParallelReplicas()) { diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index 8387b6641fb..5d01006517b 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -42,6 +42,7 @@ class ReadFromRemote final : public ISourceStep void enableMemoryBoundMerging(); void enforceAggregationInOrder(); + void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } private: ClusterProxy::SelectStreamFactory::Shards shards; @@ -57,6 +58,7 @@ class ReadFromRemote final : public ISourceStep UInt32 shard_count; const String cluster_name; std::optional priority_func_factory; + bool is_remote_function = false; void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard); void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 851b3116571..7eec8a478ed 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -398,7 +398,11 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = query_kind; + + if (is_remote_function) + modified_client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; + else + modified_client_info.query_kind = query_kind; if (!duplicated_part_uuids.empty()) connections->sendIgnoredPartUUIDs(duplicated_part_uuids); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 6caabead544..f7110fc691a 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -213,6 +213,8 @@ class RemoteQueryExecutor void setLogger(LoggerPtr logger) { log = logger; } + void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } + const Block & getHeader() const { return header; } IConnections & getConnections() { return *connections; } @@ -302,6 +304,8 @@ class RemoteQueryExecutor bool has_postponed_packet = false; + bool is_remote_function = false; + /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; From 5e209e1b470f03f9740d4b63b85cef7f8e831915 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 10 Dec 2024 19:53:20 +0100 Subject: [PATCH 3/8] Avoid CLIENT_INFO_DOES_NOT_MATCH error --- src/QueryPipeline/RemoteQueryExecutor.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 7eec8a478ed..7f5446ad586 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -400,7 +400,10 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As ClientInfo modified_client_info = context->getClientInfo(); if (is_remote_function) - modified_client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; + { + modified_client_info.setInitialQuery(); + modified_client_info.client_name = "ClickHouse server"; + } else modified_client_info.query_kind = query_kind; From 26918118475e3fafb7f98cf8e41a1e164356ee80 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 10 Dec 2024 22:19:01 +0100 Subject: [PATCH 4/8] Workaroung for remote with multiple shards --- src/Processors/QueryPlan/ReadFromRemote.cpp | 3 +++ src/QueryPipeline/RemoteQueryExecutor.cpp | 3 ++- src/QueryPipeline/RemoteQueryExecutor.h | 3 +++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 42f4dc6c834..69536b8b293 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -223,6 +223,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream auto remote_query_executor = std::make_shared( std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage); remote_query_executor->setRemoteFunction(my_is_remote_function); + remote_query_executor->setShardCount(my_shard_count); auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending); QueryPipelineBuilder builder; @@ -307,6 +308,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact remote_query_executor->setLogger(log); remote_query_executor->setPoolMode(PoolMode::GET_ONE); remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); @@ -324,6 +326,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage); remote_query_executor->setLogger(log); remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (context->canUseTaskBasedParallelReplicas()) { diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 7f5446ad586..c6ebfd98623 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -399,7 +399,8 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - if (is_remote_function) + /// Doesn't support now "remote('1.1.1.{1,2}')"" + if (is_remote_function && (shard_count == 1)) { modified_client_info.setInitialQuery(); modified_client_info.client_name = "ClickHouse server"; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index f7110fc691a..16f37ca7579 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -215,6 +215,8 @@ class RemoteQueryExecutor void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } + void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; } + const Block & getHeader() const { return header; } IConnections & getConnections() { return *connections; } @@ -305,6 +307,7 @@ class RemoteQueryExecutor bool has_postponed_packet = false; bool is_remote_function = false; + UInt32 shard_count = 0; /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; From e2f670e04854b25db078b5dcff3182f2c837a4b5 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 10 Dec 2024 22:37:23 +0100 Subject: [PATCH 5/8] Workaround to CLIENT_INFO_DOES_NOT_MATCH with 'TCP not equal to HTTP' --- src/QueryPipeline/RemoteQueryExecutor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index c6ebfd98623..99055f03bc7 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -404,6 +404,7 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As { modified_client_info.setInitialQuery(); modified_client_info.client_name = "ClickHouse server"; + modified_client_info.interface = ClientInfo::Interface::TCP; } else modified_client_info.query_kind = query_kind; From 2ec5804c7a16a3f6b11a0a3aa96113594b98b224 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 11 Dec 2024 00:16:42 +0100 Subject: [PATCH 6/8] Keep initial_query_id for remote with INITIAL_QUERY --- src/Interpreters/Context.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 6ec108eb1ce..15ef854f7b7 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2702,8 +2702,11 @@ void Context::setCurrentQueryId(const String & query_id) client_info.current_query_id = query_id_to_set; - if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) + if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY + && client_info.initial_query_id.empty()) + { client_info.initial_query_id = client_info.current_query_id; + } } void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation) From 44ed61faac7ba40dc559b0dcfde4610c2c6cdd9b Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 3 Jan 2025 18:38:12 +0100 Subject: [PATCH 7/8] Left QueryID for server only --- src/Interpreters/Context.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 15ef854f7b7..66576a6e34a 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2703,7 +2703,7 @@ void Context::setCurrentQueryId(const String & query_id) client_info.current_query_id = query_id_to_set; if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY - && client_info.initial_query_id.empty()) + && (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty())) { client_info.initial_query_id = client_info.current_query_id; } From 8b6064fadc727d616348709ff095e4fd6df3e2ec Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 14 Jan 2025 12:48:16 +0100 Subject: [PATCH 8/8] Fixes after few comments --- src/Interpreters/ClusterProxy/executeQuery.cpp | 2 +- src/Processors/QueryPlan/ReadFromRemote.h | 2 +- tests/integration/test_storage_iceberg/test.py | 8 ++++++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 557bea74b96..8eb490cbaab 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -442,7 +442,7 @@ void executeQuery( not_optimized_cluster->getName()); read_from_remote->setStepDescription("Read from remote replica"); - read_from_remote->setRemoteFunction(is_remote_function); + read_from_remote->setIsRemoteFunction(is_remote_function); plan->addStep(std::move(read_from_remote)); plan->addInterpreterContext(new_context); plans.emplace_back(std::move(plan)); diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index 5d01006517b..99fa95c0c72 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -42,7 +42,7 @@ class ReadFromRemote final : public ISourceStep void enableMemoryBoundMerging(); void enforceAggregationInOrder(); - void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } + void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } private: ClusterProxy::SelectStreamFactory::Shards shards; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index cd79aacd534..ec740d600f9 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -676,6 +676,14 @@ def add_df(mode): ) assert len(cluster_secondary_queries) == 1 + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + assert len(select_remote_cluster) == 600 + assert select_remote_cluster == select_regular + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])