From ae6ebbe22e3a493e65332692ab8c90c96523e05e Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Thu, 9 Jan 2025 16:01:55 +0000 Subject: [PATCH] ESQL: `connect_transport_exception` should be thrown instead of `verification_exception` when ENRICH-ing if remote is disconnected (#119750) * fix: `verification_exception` is thrown instead of `connect_transport_exception` in ENRICH In the context of ENRICH, if a remote is disconnected and skip unavailable is set to `true`, then `verification_exception` is thrown instead of `connect_transport_exception`. This PR fixes this and adds the IT tests for ENRICH for RCS 1 and RCS 2. * Update docs/changelog/119750.yaml * Update 119750.yaml --------- Co-authored-by: Michael Peterson --- docs/changelog/119750.yaml | 6 + .../esql/session/EsqlSessionCCSUtils.java | 4 + ...terEsqlRCS1EnrichUnavailableRemotesIT.java | 353 ++++++++++++++++ ...terEsqlRCS2EnrichUnavailableRemotesIT.java | 380 ++++++++++++++++++ 4 files changed, 743 insertions(+) create mode 100644 docs/changelog/119750.yaml create mode 100644 x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java create mode 100644 x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java diff --git a/docs/changelog/119750.yaml b/docs/changelog/119750.yaml new file mode 100644 index 0000000000000..2ec5c298d0eb1 --- /dev/null +++ b/docs/changelog/119750.yaml @@ -0,0 +1,6 @@ +pr: 119750 +summary: "ESQL: `connect_transport_exception` should be thrown instead of `verification_exception`\ + \ when ENRICH-ing if remote is disconnected" +area: Search +type: bug +issues: [] diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 95f7a37ce4d62..f8670a8e6d053 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -211,6 +211,10 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { + if (executionInfo.getCluster(c).getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + // if cluster was already marked SKIPPED during enrich policy resolution, do not overwrite + continue; + } final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); if (missingIndicesIsFatal(c, executionInfo)) { String error = Strings.format( diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java new file mode 100644 index 0000000000000..47c7ac8241fcd --- /dev/null +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java @@ -0,0 +1,353 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.remotecluster; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CrossClusterEsqlRCS1EnrichUnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean(); + + static { + fulfillingCluster = ElasticsearchCluster.local() + .name("fulfilling-cluster") + .nodes(1) + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("remote_cluster.port", "0") + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get())) + .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key") + .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password") + .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true")) + .build(); + + queryCluster = ElasticsearchCluster.local() + .name("query-cluster") + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get())) + .build(); + } + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); + + private final String[] modes = { "_coordinator", "_remote" }; + + @Before + public void setupPreRequisites() throws IOException { + setupRolesAndPrivileges(); + setSourceData(); + + var policy = createPolicy("employees-policy", "employees", "email", new String[] { "id", "designation" }); + // Create the enrich policy on both clusters. + assertOK(client().performRequest(policy)); + assertOK(performRequestAgainstFulfillingCluster(policy)); + + // Execute the enrich policy on both clusters. + var exec = executePolicy("employees-policy"); + assertOK(client().performRequest(exec)); + assertOK(performRequestAgainstFulfillingCluster(exec)); + } + + public void testEsqlEnrichWithSkipUnavailable() throws Exception { + esqlEnrichWithRandomSkipUnavailable(); + esqlEnrichWithSkipUnavailableTrue(); + esqlEnrichWithSkipUnavailableFalse(); + } + + private void esqlEnrichWithRandomSkipUnavailable() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), randomBoolean()); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + Response response = client().performRequest(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(6)); + for (int i = 0; i < 6; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(2)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(0)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("successful")); + } + + @SuppressWarnings("unchecked") + private void esqlEnrichWithSkipUnavailableTrue() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), true); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enriched,my_remote_cluster:to-be-enriched | ENRICH employees-policy | LIMIT 10"; + Response response = client().performRequest(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(3)); + + // We only have 3 values since the remote cluster is turned off. + for (int i = 0; i < 3; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(1)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(1)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("skipped")); + + ArrayList remoteClusterFailures = (ArrayList) remoteClusterDetails.get("failures"); + assertThat(remoteClusterFailures.size(), equalTo(1)); + Map failuresMap = (Map) remoteClusterFailures.get(0); + + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("connect_transport_exception")); + assertThat(reason.get("reason").toString(), containsString("Unable to connect to [my_remote_cluster]")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void esqlEnrichWithSkipUnavailableFalse() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), false); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(query))); + assertThat(ex.getMessage(), containsString("connect_transport_exception")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void setupRolesAndPrivileges() throws IOException { + var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER); + putUserRequest.setJsonEntity(""" + { + "password": "x-pack-test-password", + "roles" : ["remote_search"] + }"""); + assertOK(adminClient().performRequest(putUserRequest)); + + var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE); + putRoleOnRemoteClusterRequest.setJsonEntity(""" + { + "indices": [ + { + "names": ["*"], + "privileges": ["read"] + } + ], + "cluster": [ "monitor_enrich", "manage_own_api_key" ], + "remote_indices": [ + { + "names": ["*"], + "privileges": ["read"], + "clusters": ["my_remote_cluster"] + } + ], + "remote_cluster": [ + { + "privileges": ["monitor_enrich"], + "clusters": ["my_remote_cluster"] + } + ] + }"""); + assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest)); + } + + private void setSourceData() throws IOException { + Request createIndex = new Request("PUT", "employees"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "id": { "type": "integer" }, + "email": { "type": "text" }, + "designation": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "employees" } } + { "id": 1, "email": "a@corp.co", "designation": "SDE intern"} + { "index": { "_index": "employees" } } + { "id": 2, "email": "b@corp.co", "designation": "SDE 1"} + { "index": { "_index": "employees" } } + { "id": 3, "email": "c@corp.co", "designation": "SDE 2"} + { "index": { "_index": "employees" } } + { "id": 4, "email": "d@corp.co", "designation": "SSE"} + { "index": { "_index": "employees" } } + { "id": 5, "email": "e@corp.co", "designation": "PSE 1"} + { "index": { "_index": "employees" } } + { "id": 6, "email": "f@corp.co", "designation": "PSE 2"} + """); + assertOK(client().performRequest(bulkRequest)); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + + createIndex = new Request("PUT", "to-be-enriched"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "email": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "a@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "b@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "c@corp.co"} + """); + assertOK(client().performRequest(bulkRequest)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "d@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "e@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "f@corp.co"} + """); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + } + + private Request createPolicy(String policyName, String matchIndex, String matchField, String[] enrichFields) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + body.startObject(); + body.startObject("match"); + body.field("indices", matchIndex); + body.field("match_field", matchField); + body.field("enrich_fields", enrichFields); + + body.endObject(); + body.endObject(); + + return makeRequest("PUT", "_enrich/policy/" + policyName, body); + } + + private Request executePolicy(String policyName) { + return new Request("PUT", "_enrich/policy/employees-policy/_execute"); + } + + private Request esqlRequest(String query) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + + body.startObject(); + body.field("query", query); + body.field("include_ccs_metadata", true); + body.endObject(); + + return makeRequest("POST", "_query", body); + } + + private Request makeRequest(String method, String endpoint, XContentBuilder requestBody) { + Request request = new Request(method, endpoint); + request.setJsonEntity(Strings.toString(requestBody)); + return request; + } +} diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java new file mode 100644 index 0000000000000..da59e3c772736 --- /dev/null +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java @@ -0,0 +1,380 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.remotecluster; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.util.resource.Resource; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CrossClusterEsqlRCS2EnrichUnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); + + static { + fulfillingCluster = ElasticsearchCluster.local() + .name("fulfilling-cluster") + .nodes(1) + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("remote_cluster.port", "0") + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_server.ssl.enabled", "true") + .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key") + .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password") + .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true")) + .build(); + + queryCluster = ElasticsearchCluster.local() + .name("query-cluster") + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_client.ssl.enabled", "true") + .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("cluster.remote.my_remote_cluster.credentials", () -> { + if (API_KEY_MAP_REF.get() == null) { + final Map apiKeyMap = createCrossClusterAccessApiKey(""" + { + "search": [ + { + "names": ["*"] + } + ] + }"""); + API_KEY_MAP_REF.set(apiKeyMap); + } + return (String) API_KEY_MAP_REF.get().get("encoded"); + }) + .rolesFile(Resource.fromClasspath("roles.yml")) + .user(REMOTE_METRIC_USER, PASS.toString(), "read_remote_shared_metrics", false) + .build(); + } + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); + + private String[] modes = { "_coordinator", "_remote" }; + + @Before + public void setupPreRequisites() throws IOException { + setupRolesAndPrivileges(); + setSourceData(); + + var policy = createPolicy("employees-policy", "employees", "email", new String[] { "id", "designation" }); + // Create the enrich policy on both clusters. + assertOK(client().performRequest(policy)); + assertOK(performRequestAgainstFulfillingCluster(policy)); + + // Execute the enrich policy on both clusters. + var exec = executePolicy("employees-policy"); + assertOK(client().performRequest(exec)); + assertOK(performRequestAgainstFulfillingCluster(exec)); + } + + public void testEsqlEnrichWithSkipUnavailable() throws Exception { + esqlEnrichWithRandomSkipUnavailable(); + esqlEnrichWithSkipUnavailableTrue(); + esqlEnrichWithSkipUnavailableFalse(); + } + + private void esqlEnrichWithRandomSkipUnavailable() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), randomBoolean()); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + Response response = performRequestWithRemoteSearchUser(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(6)); + for (int i = 0; i < 6; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(2)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(0)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("successful")); + } + + @SuppressWarnings("unchecked") + private void esqlEnrichWithSkipUnavailableTrue() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), true); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enriched,my_remote_cluster:to-be-enriched | ENRICH employees-policy | LIMIT 10"; + Response response = performRequestWithRemoteSearchUser(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(3)); + + // We only have 3 values since the remote cluster is turned off. + for (int i = 0; i < 3; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(1)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(1)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("skipped")); + + ArrayList remoteClusterFailures = (ArrayList) remoteClusterDetails.get("failures"); + assertThat(remoteClusterFailures.size(), equalTo(1)); + Map failuresMap = (Map) remoteClusterFailures.get(0); + + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("connect_transport_exception")); + assertThat(reason.get("reason").toString(), containsString("Unable to connect to [my_remote_cluster]")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void esqlEnrichWithSkipUnavailableFalse() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), false); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + ResponseException ex = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(esqlRequest(query))); + assertThat(ex.getMessage(), containsString("connect_transport_exception")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void setupRolesAndPrivileges() throws IOException { + var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER); + putUserRequest.setJsonEntity(""" + { + "password": "x-pack-test-password", + "roles" : ["remote_search"] + }"""); + assertOK(adminClient().performRequest(putUserRequest)); + + var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE); + putRoleOnRemoteClusterRequest.setJsonEntity(""" + { + "indices": [ + { + "names": ["*"], + "privileges": ["read"] + } + ], + "cluster": [ "monitor_enrich", "manage_own_api_key" ], + "remote_indices": [ + { + "names": ["*"], + "privileges": ["read"], + "clusters": ["my_remote_cluster"] + } + ], + "remote_cluster": [ + { + "privileges": ["monitor_enrich"], + "clusters": ["my_remote_cluster"] + } + ] + }"""); + assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest)); + } + + private void setSourceData() throws IOException { + Request createIndex = new Request("PUT", "employees"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "id": { "type": "integer" }, + "email": { "type": "text" }, + "designation": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "employees" } } + { "id": 1, "email": "a@corp.co", "designation": "SDE intern"} + { "index": { "_index": "employees" } } + { "id": 2, "email": "b@corp.co", "designation": "SDE 1"} + { "index": { "_index": "employees" } } + { "id": 3, "email": "c@corp.co", "designation": "SDE 2"} + { "index": { "_index": "employees" } } + { "id": 4, "email": "d@corp.co", "designation": "SSE"} + { "index": { "_index": "employees" } } + { "id": 5, "email": "e@corp.co", "designation": "PSE 1"} + { "index": { "_index": "employees" } } + { "id": 6, "email": "f@corp.co", "designation": "PSE 2"} + """); + assertOK(client().performRequest(bulkRequest)); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + + createIndex = new Request("PUT", "to-be-enriched"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "email": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "a@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "b@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "c@corp.co"} + """); + assertOK(client().performRequest(bulkRequest)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "d@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "e@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "f@corp.co"} + """); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + } + + private Request createPolicy(String policyName, String matchIndex, String matchField, String[] enrichFields) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + body.startObject(); + body.startObject("match"); + body.field("indices", matchIndex); + body.field("match_field", matchField); + body.field("enrich_fields", enrichFields); + + body.endObject(); + body.endObject(); + + return makeRequest("PUT", "_enrich/policy/" + policyName, body); + } + + private Request executePolicy(String policyName) { + return new Request("PUT", "_enrich/policy/employees-policy/_execute"); + } + + private Request esqlRequest(String query) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + + body.startObject(); + body.field("query", query); + body.field("include_ccs_metadata", true); + body.endObject(); + + return makeRequest("POST", "_query", body); + } + + private Request makeRequest(String method, String endpoint, XContentBuilder requestBody) { + Request request = new Request(method, endpoint); + request.setJsonEntity(Strings.toString(requestBody)); + return request; + } + + private Response performRequestWithRemoteSearchUser(final Request request) throws IOException { + request.setOptions( + RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(REMOTE_SEARCH_USER, PASS)) + ); + return client().performRequest(request); + } +}