diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 3ce2ff04a74..8eb490cbaab 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -442,6 +442,7 @@ void executeQuery( not_optimized_cluster->getName()); read_from_remote->setStepDescription("Read from remote replica"); + read_from_remote->setIsRemoteFunction(is_remote_function); plan->addStep(std::move(read_from_remote)); plan->addInterpreterContext(new_context); plans.emplace_back(std::move(plan)); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 6ec108eb1ce..66576a6e34a 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2702,8 +2702,11 @@ void Context::setCurrentQueryId(const String & query_id) client_info.current_query_id = query_id_to_set; - if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) + if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY + && (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty())) + { client_info.initial_query_id = client_info.current_query_id; + } } void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation) diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 13cd9a73b13..69536b8b293 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -167,7 +167,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream my_main_table = main_table, my_table_func_ptr = table_func_ptr, my_scalars = scalars, my_external_tables = external_tables, my_stage = stage, local_delay = shard.local_delay, - add_agg_info, add_totals, add_extremes, async_read, async_query_sending]() mutable + add_agg_info, add_totals, add_extremes, async_read, async_query_sending, + my_is_remote_function = is_remote_function]() mutable -> QueryPipelineBuilder { auto current_settings = my_context->getSettingsRef(); @@ -221,6 +222,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream {DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; auto remote_query_executor = std::make_shared( std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage); + remote_query_executor->setRemoteFunction(my_is_remote_function); + remote_query_executor->setShardCount(my_shard_count); auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending); QueryPipelineBuilder builder; @@ -304,6 +307,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact priority_func); remote_query_executor->setLogger(log); remote_query_executor->setPoolMode(PoolMode::GET_ONE); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); @@ -320,6 +325,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact auto remote_query_executor = std::make_shared( shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage); remote_query_executor->setLogger(log); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (context->canUseTaskBasedParallelReplicas()) { diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index 8387b6641fb..99fa95c0c72 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -42,6 +42,7 @@ class ReadFromRemote final : public ISourceStep void enableMemoryBoundMerging(); void enforceAggregationInOrder(); + void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } private: ClusterProxy::SelectStreamFactory::Shards shards; @@ -57,6 +58,7 @@ class ReadFromRemote final : public ISourceStep UInt32 shard_count; const String cluster_name; std::optional priority_func_factory; + bool is_remote_function = false; void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard); void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 851b3116571..99055f03bc7 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -398,7 +398,16 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = query_kind; + + /// Doesn't support now "remote('1.1.1.{1,2}')"" + if (is_remote_function && (shard_count == 1)) + { + modified_client_info.setInitialQuery(); + modified_client_info.client_name = "ClickHouse server"; + modified_client_info.interface = ClientInfo::Interface::TCP; + } + else + modified_client_info.query_kind = query_kind; if (!duplicated_part_uuids.empty()) connections->sendIgnoredPartUUIDs(duplicated_part_uuids); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 6caabead544..16f37ca7579 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -213,6 +213,10 @@ class RemoteQueryExecutor void setLogger(LoggerPtr logger) { log = logger; } + void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } + + void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; } + const Block & getHeader() const { return header; } IConnections & getConnections() { return *connections; } @@ -302,6 +306,9 @@ class RemoteQueryExecutor bool has_postponed_packet = false; + bool is_remote_function = false; + UInt32 shard_count = 0; + /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index e8bf031021e..67c3120a882 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -360,7 +360,7 @@ def test_parallel_distributed_insert_select_with_schema_inference(started_cluste node.query( """ CREATE TABLE parallel_insert_select ON CLUSTER 'first_shard' (a String, b UInt64) - ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}') + ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/parallel_insert_select', '{replica}') ORDER BY (a, b); """ ) @@ -508,3 +508,57 @@ def test_cluster_default_expression(started_cluster): ) assert result == expected_result + + +def test_remote_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + """ + SELECT * from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + """ + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=True + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) + + +def test_remote_no_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + """ + SELECT * from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + """ + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=False + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index cd79aacd534..ec740d600f9 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -676,6 +676,14 @@ def add_df(mode): ) assert len(cluster_secondary_queries) == 1 + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + assert len(select_remote_cluster) == 600 + assert select_remote_cluster == select_regular + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])