From 6a8bdb5e5e691381cda5122b03c6c3fd7b6fd265 Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Jaureguizar Date: Thu, 9 Jan 2025 11:33:51 +0100 Subject: [PATCH] improve broker selector utils (#14733) --- .../org/apache/pinot/client/BrokerCache.java | 18 +-- .../pinot/client/DynamicBrokerSelector.java | 6 +- .../client/utils/BrokerSelectorUtils.java | 91 +++++++++---- .../client/DynamicBrokerSelectorTest.java | 20 +++ .../client/utils/BrokerSelectorUtilsTest.java | 121 ++++++++++++++++++ 5 files changed, 218 insertions(+), 38 deletions(-) create mode 100644 pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java index 3b2a789eac02..c2e1b98caf1a 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java @@ -190,20 +190,14 @@ protected void updateBrokerData() } public String getBroker(String... tableNames) { - List brokers = null; // If tableNames is not-null, filter out nulls - tableNames = - tableNames == null ? tableNames : Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new); - if (!(tableNames == null || tableNames.length == 0)) { - // returning list of common brokers hosting all the tables. - brokers = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames), - _brokerData.getTableToBrokerMap()); + tableNames = tableNames == null ? tableNames + : Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new); + if (tableNames == null || tableNames.length == 0) { + List brokers = _brokerData.getBrokers(); + return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())); } - - if (brokers == null || brokers.isEmpty()) { - brokers = _brokerData.getBrokers(); - } - return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())); + return BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames), _brokerData.getTableToBrokerMap()); } public List getBrokers() { diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java index 6683b6a5fc60..498a68ce0be4 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java @@ -91,10 +91,10 @@ private void refresh() { public String selectBroker(String... tableNames) { if (!(tableNames == null || tableNames.length == 0 || tableNames[0] == null)) { // getting list of brokers hosting all the tables. - List list = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames), + String randomBroker = BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames), _tableToBrokerListMapRef.get()); - if (list != null && !list.isEmpty()) { - return list.get(ThreadLocalRandom.current().nextInt(list.size())); + if (randomBroker != null) { + return randomBroker; } } diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java index e3a1df44db7b..c465f101aa08 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java @@ -19,9 +19,13 @@ package org.apache.pinot.client.utils; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; import org.apache.pinot.client.ExternalViewReader; @@ -34,35 +38,52 @@ private BrokerSelectorUtils() { * * @param tableNames: List of table names. * @param brokerData: map holding data for table hosting on brokers. - * @return list of common brokers hosting all the tables. + * @return list of common brokers hosting all the tables or null if no common brokers found. + * @deprecated Use {@link #getTablesCommonBrokersSet(List, Map)} instead. It is more efficient and its semantics are + * clearer (ie it returns an empty set instead of null if no common brokers are found). */ - public static List getTablesCommonBrokers(List tableNames, Map> brokerData) { - List> tablesBrokersList = new ArrayList<>(); - for (String name: tableNames) { - String tableName = getTableNameWithoutSuffix(name); - int idx = tableName.indexOf('.'); - - if (brokerData.containsKey(tableName)) { - tablesBrokersList.add(brokerData.get(tableName)); - } else if (idx > 0) { - // In case tableName is formatted as . - tableName = tableName.substring(idx + 1); - tablesBrokersList.add(brokerData.get(tableName)); - } + @Nullable + @Deprecated + public static List getTablesCommonBrokers(@Nullable List tableNames, + Map> brokerData) { + Set tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData); + if (tablesCommonBrokersSet == null || tablesCommonBrokersSet.isEmpty()) { + return null; } + return new ArrayList<>(tablesCommonBrokersSet); + } - // return null if tablesBrokersList is empty or contains null - if (tablesBrokersList.isEmpty() - || tablesBrokersList.stream().anyMatch(Objects::isNull)) { + /** + * Returns a random broker from the common brokers hosting all the tables. + */ + @Nullable + public static String getRandomBroker(@Nullable List tableNames, Map> brokerData) { + Set tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData); + if (tablesCommonBrokersSet.isEmpty()) { return null; } + return tablesCommonBrokersSet.stream() + .skip(ThreadLocalRandom.current().nextInt(tablesCommonBrokersSet.size())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No broker found")); + } - // Make a copy of the brokersList of the first table. retainAll does inplace modifications. - // So lists from brokerData should not be used directly. - List commonBrokers = new ArrayList<>(tablesBrokersList.get(0)); - for (int i = 1; i < tablesBrokersList.size(); i++) { - commonBrokers.retainAll(tablesBrokersList.get(i)); + /** + * + * @param tableNames: List of table names. + * @param brokerData: map holding data for table hosting on brokers. + * @return set of common brokers hosting all the tables + */ + public static Set getTablesCommonBrokersSet( + @Nullable List tableNames, Map> brokerData) { + if (tableNames == null || tableNames.isEmpty()) { + return Collections.emptySet(); + } + HashSet commonBrokers = getBrokers(tableNames.get(0), brokerData); + for (int i = 1; i < tableNames.size() && !commonBrokers.isEmpty(); i++) { + commonBrokers.retainAll(getBrokers(tableNames.get(i), brokerData)); } + return commonBrokers; } @@ -71,4 +92,28 @@ private static String getTableNameWithoutSuffix(String tableName) { tableName.replace(ExternalViewReader.OFFLINE_SUFFIX, ""). replace(ExternalViewReader.REALTIME_SUFFIX, ""); } + + /** + * Returns the brokers for the given table name. + * + * This means that an empty set is returned if there are no brokers for the given table name. + */ + private static HashSet getBrokers(String tableName, Map> brokerData) { + String tableNameWithoutSuffix = getTableNameWithoutSuffix(tableName); + int idx = tableNameWithoutSuffix.indexOf('.'); + + List brokers = brokerData.get(tableNameWithoutSuffix); + if (brokers != null) { + return new HashSet<>(brokers); + } else if (idx > 0) { + // TODO: This is probably unnecessary and even wrong. `brokerData` should include the fully qualified name. + // In case tableNameWithoutSuffix is formatted as .
and not found in the fully qualified name + tableNameWithoutSuffix = tableNameWithoutSuffix.substring(idx + 1); + List brokersWithoutDb = brokerData.get(tableNameWithoutSuffix); + if (brokersWithoutDb != null) { + return new HashSet<>(brokersWithoutDb); + } + } + return new HashSet<>(); + } } diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java index d52438ab542c..986b4773c7c2 100644 --- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java +++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java @@ -152,4 +152,24 @@ public void testCloseZkClient() { Mockito.verify(_mockZkClient, times(1)).close(); } + + @Test + public void testSelectBrokerWithInvalidTable() { + Map> tableToBrokerListMap = new HashMap<>(); + tableToBrokerListMap.put("table1", Collections.singletonList("broker1")); + when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap); + _dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data"); + String result = _dynamicBrokerSelectorUnderTest.selectBroker("invalidTable"); + assertEquals(result, "broker1"); + } + + @Test + public void testSelectBrokerWithTwoTablesOneInvalid() { + Map> tableToBrokerListMap = new HashMap<>(); + tableToBrokerListMap.put("table1", Collections.singletonList("broker1")); + when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap); + _dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data"); + String result = _dynamicBrokerSelectorUnderTest.selectBroker("table1", "invalidTable"); + assertEquals(result, "broker1"); + } } diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java new file mode 100644 index 000000000000..512a0a3c862a --- /dev/null +++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client.utils; + +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + + +public class BrokerSelectorUtilsTest { + + HashMap> _brokerData = new HashMap<>(); + @Test + public void getTablesCommonBrokersSetNullTables() { + Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(null, _brokerData); + Assert.assertEquals(tableSet, Set.of()); + } + + @Test + public void getTablesCommonBrokersListNullTables() { + List tableList = BrokerSelectorUtils.getTablesCommonBrokers(null, _brokerData); + Assert.assertNull(tableList); + } + + @Test + public void getTablesCommonBrokersSetEmptyTables() { + Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of(), _brokerData); + Assert.assertEquals(tableSet, Set.of()); + } + + @Test + public void getTablesCommonBrokersListEmptyTables() { + List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of(), _brokerData); + Assert.assertNull(tableList); + } + + @Test + public void getTablesCommonBrokersSetNotExistentTable() { + Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("notExistent"), _brokerData); + Assert.assertEquals(tableSet, Set.of()); + } + + @Test + public void getTablesCommonBrokersListNotExistentTable() { + List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("notExistent"), _brokerData); + Assert.assertNull(tableList); + } + + @Test + public void getTablesCommonBrokersSetOneTable() { + _brokerData.put("table1", List.of("broker1")); + Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1"), _brokerData); + Assert.assertEquals(tableSet, Set.of("broker1")); + } + + @Test + public void getTablesCommonBrokersListOneTable() { + _brokerData.put("table1", List.of("broker1")); + List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1"), _brokerData); + Assert.assertNotNull(tableList); + Assert.assertEquals(tableList, List.of("broker1")); + } + + @Test + public void getTablesCommonBrokersSetTwoTables() { + _brokerData.put("table1", List.of("broker1")); + _brokerData.put("table2", List.of("broker1")); + Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData); + Assert.assertNotNull(tableSet); + Assert.assertEquals(tableSet, Set.of("broker1")); + } + + @Test + public void getTablesCommonBrokersListTwoTables() { + _brokerData.put("table1", List.of("broker1")); + _brokerData.put("table2", List.of("broker1")); + List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData); + Assert.assertNotNull(tableList); + Assert.assertEquals(tableList, List.of("broker1")); + } + + @Test + public void getTablesCommonBrokersSetTwoTablesDifferentBrokers() { + _brokerData.put("table1", List.of("broker1")); + _brokerData.put("table2", List.of("broker2")); + Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData); + Assert.assertEquals(tableSet, Set.of()); + } + + @Test + public void getTablesCommonBrokersListTwoTablesDifferentBrokers() { + _brokerData.put("table1", List.of("broker1")); + _brokerData.put("table2", List.of("broker2")); + List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData); + Assert.assertNull(tableList); + } + + @AfterMethod + public void tearDown() { + _brokerData.clear(); + } +}