Skip to content

Commit

Permalink
improve broker selector utils (#14733)
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz authored Jan 9, 2025
1 parent a8c3107 commit 6a8bdb5
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,20 +190,14 @@ protected void updateBrokerData()
}

public String getBroker(String... tableNames) {
List<String> brokers = null;
// If tableNames is not-null, filter out nulls
tableNames =
tableNames == null ? tableNames : Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
if (!(tableNames == null || tableNames.length == 0)) {
// returning list of common brokers hosting all the tables.
brokers = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
_brokerData.getTableToBrokerMap());
tableNames = tableNames == null ? tableNames
: Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
if (tableNames == null || tableNames.length == 0) {
List<String> brokers = _brokerData.getBrokers();
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}

if (brokers == null || brokers.isEmpty()) {
brokers = _brokerData.getBrokers();
}
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
return BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames), _brokerData.getTableToBrokerMap());
}

public List<String> getBrokers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ private void refresh() {
public String selectBroker(String... tableNames) {
if (!(tableNames == null || tableNames.length == 0 || tableNames[0] == null)) {
// getting list of brokers hosting all the tables.
List<String> list = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
String randomBroker = BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames),
_tableToBrokerListMapRef.get());
if (list != null && !list.isEmpty()) {
return list.get(ThreadLocalRandom.current().nextInt(list.size()));
if (randomBroker != null) {
return randomBroker;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.pinot.client.utils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.pinot.client.ExternalViewReader;


Expand All @@ -34,35 +38,52 @@ private BrokerSelectorUtils() {
*
* @param tableNames: List of table names.
* @param brokerData: map holding data for table hosting on brokers.
* @return list of common brokers hosting all the tables.
* @return list of common brokers hosting all the tables or null if no common brokers found.
* @deprecated Use {@link #getTablesCommonBrokersSet(List, Map)} instead. It is more efficient and its semantics are
* clearer (ie it returns an empty set instead of null if no common brokers are found).
*/
public static List<String> getTablesCommonBrokers(List<String> tableNames, Map<String, List<String>> brokerData) {
List<List<String>> tablesBrokersList = new ArrayList<>();
for (String name: tableNames) {
String tableName = getTableNameWithoutSuffix(name);
int idx = tableName.indexOf('.');

if (brokerData.containsKey(tableName)) {
tablesBrokersList.add(brokerData.get(tableName));
} else if (idx > 0) {
// In case tableName is formatted as <db>.<table>
tableName = tableName.substring(idx + 1);
tablesBrokersList.add(brokerData.get(tableName));
}
@Nullable
@Deprecated
public static List<String> getTablesCommonBrokers(@Nullable List<String> tableNames,
Map<String, List<String>> brokerData) {
Set<String> tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData);
if (tablesCommonBrokersSet == null || tablesCommonBrokersSet.isEmpty()) {
return null;
}
return new ArrayList<>(tablesCommonBrokersSet);
}

// return null if tablesBrokersList is empty or contains null
if (tablesBrokersList.isEmpty()
|| tablesBrokersList.stream().anyMatch(Objects::isNull)) {
/**
* Returns a random broker from the common brokers hosting all the tables.
*/
@Nullable
public static String getRandomBroker(@Nullable List<String> tableNames, Map<String, List<String>> brokerData) {
Set<String> tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData);
if (tablesCommonBrokersSet.isEmpty()) {
return null;
}
return tablesCommonBrokersSet.stream()
.skip(ThreadLocalRandom.current().nextInt(tablesCommonBrokersSet.size()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No broker found"));
}

// Make a copy of the brokersList of the first table. retainAll does inplace modifications.
// So lists from brokerData should not be used directly.
List<String> commonBrokers = new ArrayList<>(tablesBrokersList.get(0));
for (int i = 1; i < tablesBrokersList.size(); i++) {
commonBrokers.retainAll(tablesBrokersList.get(i));
/**
*
* @param tableNames: List of table names.
* @param brokerData: map holding data for table hosting on brokers.
* @return set of common brokers hosting all the tables
*/
public static Set<String> getTablesCommonBrokersSet(
@Nullable List<String> tableNames, Map<String, List<String>> brokerData) {
if (tableNames == null || tableNames.isEmpty()) {
return Collections.emptySet();
}
HashSet<String> commonBrokers = getBrokers(tableNames.get(0), brokerData);
for (int i = 1; i < tableNames.size() && !commonBrokers.isEmpty(); i++) {
commonBrokers.retainAll(getBrokers(tableNames.get(i), brokerData));
}

return commonBrokers;
}

Expand All @@ -71,4 +92,28 @@ private static String getTableNameWithoutSuffix(String tableName) {
tableName.replace(ExternalViewReader.OFFLINE_SUFFIX, "").
replace(ExternalViewReader.REALTIME_SUFFIX, "");
}

/**
* Returns the brokers for the given table name.
*
* This means that an empty set is returned if there are no brokers for the given table name.
*/
private static HashSet<String> getBrokers(String tableName, Map<String, List<String>> brokerData) {
String tableNameWithoutSuffix = getTableNameWithoutSuffix(tableName);
int idx = tableNameWithoutSuffix.indexOf('.');

List<String> brokers = brokerData.get(tableNameWithoutSuffix);
if (brokers != null) {
return new HashSet<>(brokers);
} else if (idx > 0) {
// TODO: This is probably unnecessary and even wrong. `brokerData` should include the fully qualified name.
// In case tableNameWithoutSuffix is formatted as <db>.<table> and not found in the fully qualified name
tableNameWithoutSuffix = tableNameWithoutSuffix.substring(idx + 1);
List<String> brokersWithoutDb = brokerData.get(tableNameWithoutSuffix);
if (brokersWithoutDb != null) {
return new HashSet<>(brokersWithoutDb);
}
}
return new HashSet<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,24 @@ public void testCloseZkClient() {

Mockito.verify(_mockZkClient, times(1)).close();
}

@Test
public void testSelectBrokerWithInvalidTable() {
Map<String, List<String>> tableToBrokerListMap = new HashMap<>();
tableToBrokerListMap.put("table1", Collections.singletonList("broker1"));
when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap);
_dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data");
String result = _dynamicBrokerSelectorUnderTest.selectBroker("invalidTable");
assertEquals(result, "broker1");
}

@Test
public void testSelectBrokerWithTwoTablesOneInvalid() {
Map<String, List<String>> tableToBrokerListMap = new HashMap<>();
tableToBrokerListMap.put("table1", Collections.singletonList("broker1"));
when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap);
_dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data");
String result = _dynamicBrokerSelectorUnderTest.selectBroker("table1", "invalidTable");
assertEquals(result, "broker1");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.client.utils;

import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;


public class BrokerSelectorUtilsTest {

HashMap<String, List<String>> _brokerData = new HashMap<>();
@Test
public void getTablesCommonBrokersSetNullTables() {
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(null, _brokerData);
Assert.assertEquals(tableSet, Set.of());
}

@Test
public void getTablesCommonBrokersListNullTables() {
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(null, _brokerData);
Assert.assertNull(tableList);
}

@Test
public void getTablesCommonBrokersSetEmptyTables() {
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of(), _brokerData);
Assert.assertEquals(tableSet, Set.of());
}

@Test
public void getTablesCommonBrokersListEmptyTables() {
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of(), _brokerData);
Assert.assertNull(tableList);
}

@Test
public void getTablesCommonBrokersSetNotExistentTable() {
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("notExistent"), _brokerData);
Assert.assertEquals(tableSet, Set.of());
}

@Test
public void getTablesCommonBrokersListNotExistentTable() {
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("notExistent"), _brokerData);
Assert.assertNull(tableList);
}

@Test
public void getTablesCommonBrokersSetOneTable() {
_brokerData.put("table1", List.of("broker1"));
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1"), _brokerData);
Assert.assertEquals(tableSet, Set.of("broker1"));
}

@Test
public void getTablesCommonBrokersListOneTable() {
_brokerData.put("table1", List.of("broker1"));
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1"), _brokerData);
Assert.assertNotNull(tableList);
Assert.assertEquals(tableList, List.of("broker1"));
}

@Test
public void getTablesCommonBrokersSetTwoTables() {
_brokerData.put("table1", List.of("broker1"));
_brokerData.put("table2", List.of("broker1"));
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData);
Assert.assertNotNull(tableSet);
Assert.assertEquals(tableSet, Set.of("broker1"));
}

@Test
public void getTablesCommonBrokersListTwoTables() {
_brokerData.put("table1", List.of("broker1"));
_brokerData.put("table2", List.of("broker1"));
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData);
Assert.assertNotNull(tableList);
Assert.assertEquals(tableList, List.of("broker1"));
}

@Test
public void getTablesCommonBrokersSetTwoTablesDifferentBrokers() {
_brokerData.put("table1", List.of("broker1"));
_brokerData.put("table2", List.of("broker2"));
Set<String> tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData);
Assert.assertEquals(tableSet, Set.of());
}

@Test
public void getTablesCommonBrokersListTwoTablesDifferentBrokers() {
_brokerData.put("table1", List.of("broker1"));
_brokerData.put("table2", List.of("broker2"));
List<String> tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData);
Assert.assertNull(tableList);
}

@AfterMethod
public void tearDown() {
_brokerData.clear();
}
}

0 comments on commit 6a8bdb5

Please sign in to comment.