Skip to content

Commit

Permalink
[pinot] Add logic to override maxQPS quota of datasets in onboarding …
Browse files Browse the repository at this point in the history
…flow (#1741)

* [pinot] Add logic to override maxQPS quota of datasets in onboarding flow

* address feedback

* Removed customMaxQPSQuota from PinotThirdEyeDataSourceConfig

* Rename configuration field

* Rename configuration file

* Address feedback

---------

Co-authored-by: Anshul Singh <anshul.singh@anshuls-macbook-pro-1.wyvern-sun.ts.net>
  • Loading branch information
anshul98ks123 and Anshul Singh authored Jan 8, 2025
1 parent 911ec64 commit afeb421
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 12 deletions.
3 changes: 3 additions & 0 deletions config/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ sentry:
tags:
component: coordinator

quotas:
pinotMaxQPSQuotaOverride: 100

scheduler:
# Run the Quartz Scheduler.
# Only 1 instance of scheduler should run. This responsibility is currently on the user!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
import ai.startree.thirdeye.datasource.loader.DefaultAggregationLoader;
import ai.startree.thirdeye.datasource.loader.DefaultMinMaxTimeLoader;
import ai.startree.thirdeye.rootcause.configuration.RcaConfiguration;
import ai.startree.thirdeye.spi.api.NamespaceConfigurationApi;
import ai.startree.thirdeye.spi.config.QuotasConfiguration;
import ai.startree.thirdeye.spi.config.TimeConfiguration;
import ai.startree.thirdeye.spi.datalayer.dto.NamespaceConfigurationDTO;
import ai.startree.thirdeye.spi.datasource.loader.AggregationLoader;
import ai.startree.thirdeye.spi.datasource.loader.MinMaxTimeLoader;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import com.google.inject.util.Providers;
import java.security.Provider;
import org.apache.tomcat.jdbc.pool.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,18 +39,21 @@ public class ThirdEyeCoreModule extends AbstractModule {
private final UiConfiguration uiConfiguration;
private final TimeConfiguration timeConfiguration;
private final NamespaceConfigurationDTO defaultNamespaceConfiguration;
private final QuotasConfiguration quotasConfiguration;

public ThirdEyeCoreModule(final DataSource dataSource,
final RcaConfiguration rcaConfiguration,
final UiConfiguration uiConfiguration,
final TimeConfiguration timeConfiguration,
final NamespaceConfigurationDTO defaultNamespaceConfiguration) {
final NamespaceConfigurationDTO defaultNamespaceConfiguration,
final QuotasConfiguration quotasConfiguration) {
this.dataSource = dataSource;

this.rcaConfiguration = rcaConfiguration;
this.uiConfiguration = uiConfiguration;
this.timeConfiguration = timeConfiguration;
this.defaultNamespaceConfiguration = defaultNamespaceConfiguration;
this.quotasConfiguration = quotasConfiguration;
}

@Override
Expand All @@ -70,5 +72,6 @@ protected void configure() {
bind(TimeConfiguration.class).toProvider(Providers.of(null));
}
bind(NamespaceConfigurationDTO.class).toInstance(defaultNamespaceConfiguration);
bind(QuotasConfiguration.class).toInstance(quotasConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public List<DatasetConfigDTO> onboardAll(final DataSourceDTO dataSourceDto) {
.toList();

final List<DatasetConfigDTO> addedDatasets = datasetsToOnboard.stream()
.peek(datasetConfigDTO -> dataSource.prepareDatasetForOnboarding(
datasetConfigDTO.getDataset()))
.map(datasetConfigDTO -> persist(datasetConfigDTO, dataSourceDto.getAuth()))
.collect(Collectors.toList());

Expand All @@ -91,6 +93,7 @@ public DatasetConfigDTO onboardDataset(final DataSourceDTO dataSourceDto,
checkThirdEye(dataSource != null, ThirdEyeStatus.ERR_DATASOURCE_NOT_LOADED, dataSourceDto.getName());
final DatasetConfigDTO newDataset = dataSource.getDataset(datasetName);
checkThirdEye(newDataset != null, ThirdEyeStatus.ERR_DATASET_NOT_FOUND, datasetName);
dataSource.prepareDatasetForOnboarding(datasetName);
final DatasetConfigDTO datasetConfigDTO = persist(newDataset, dataSourceDto.getAuth());
checkThirdEye(datasetConfigDTO != null, ThirdEyeStatus.ERR_DATASET_NOT_FOUND, datasetName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

import ai.startree.thirdeye.spi.config.QuotasConfiguration;
import ai.startree.thirdeye.spi.datalayer.dto.DataSourceDTO;
import ai.startree.thirdeye.spi.datasource.ThirdEyeDataSource;
import ai.startree.thirdeye.spi.datasource.ThirdEyeDataSourceContext;
Expand All @@ -40,8 +41,11 @@ public class DataSourcesLoader {

private final Map<String, ThirdEyeDataSourceFactory> dataSourceFactoryMap = new HashMap<>();

private final QuotasConfiguration quotasConfiguration;

@Inject
public DataSourcesLoader() {
public DataSourcesLoader(final QuotasConfiguration quotasConfiguration) {
this.quotasConfiguration = quotasConfiguration;
}

public void addThirdEyeDataSourceFactory(ThirdEyeDataSourceFactory f) {
Expand Down Expand Up @@ -79,7 +83,8 @@ private ThirdEyeDataSourceContext buildContext(final DataSourceDTO dataSource) {
try {
final DataSourceDTO dataSourceWithEnvVarResolved = StringTemplateUtils.applyContext(
dataSource, values);
return new ThirdEyeDataSourceContext().setDataSourceDTO(dataSourceWithEnvVarResolved);
return new ThirdEyeDataSourceContext().setDataSourceDTO(dataSourceWithEnvVarResolved)
.setQuotasConfiguration(quotasConfiguration);
} catch (IOException e) {
throw new RuntimeException(
"Error while replacing env variables in datasource spec. spec: " + dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public List<DatasetConfigDTO> getDatasets() {
return delegate.getDatasets();
}

@Override
public void prepareDatasetForOnboarding(final String datasetName) {
delegate.prepareDatasetForOnboarding(datasetName);
}

@Override
public DatasetConfigDTO getDataset(final String datasetName) {
return delegate.getDataset(datasetName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ public DatasetConfigDTO getTable(final String tableName, final String dataSource
dataSourceName);
}

public void prepareDatasetForOnboarding(final String datasetName)
throws IOException {
final JsonNode tableConfigJson = pinotControllerRestClient
.getTableConfigFromPinotEndpoint(datasetName);
checkArgument(tableConfigJson != null && !tableConfigJson.isNull(),
"Onboarding Preparation Error: table config is null for pinot table: " + datasetName);

pinotControllerRestClient.updateTableMaxQPSQuota(datasetName, tableConfigJson);
}

public void close() {
pinotControllerRestClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ public List<DatasetConfigDTO> getDatasets() {
}
}

@Override
public void prepareDatasetForOnboarding(final String datasetName) {
try {
datasetReader.prepareDatasetForOnboarding(datasetName);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

@Override
public DatasetConfigDTO getDataset(final String datasetName) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import static ai.startree.thirdeye.spi.util.SpiUtils.optional;

import ai.startree.thirdeye.plugins.datasource.pinot.PinotThirdEyeDataSourceConfig;
import ai.startree.thirdeye.spi.datasource.ThirdEyeDataSourceContext;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
Expand All @@ -33,6 +35,8 @@
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
Expand All @@ -48,16 +52,22 @@ public class PinotControllerRestClient {
private static final String PINOT_SCHEMA_ENDPOINT_TEMPLATE = "/schemas/%s";
private static final String PINOT_TABLE_CONFIG_ENDPOINT_TEMPLATE = "/tables/%s/schema";

private static final String TABLE_CONFIG_QUOTA_KEY = "quota";
private static final String TABLE_CONFIG_QUOTA_MAX_QPS_KEY = "maxQueriesPerSecond";

private final HttpHost pinotControllerHost;
private final PinotControllerHttpClientProvider pinotControllerRestClientSupplier;
private final ThirdEyeDataSourceContext context;

@Inject
public PinotControllerRestClient(final PinotThirdEyeDataSourceConfig config) {
public PinotControllerRestClient(final PinotThirdEyeDataSourceConfig config,
final ThirdEyeDataSourceContext context) {

pinotControllerHost = new HttpHost(config.getControllerHost(),
config.getControllerPort(),
config.getControllerConnectionScheme());
this.pinotControllerRestClientSupplier = new PinotControllerHttpClientProvider(config);
this.context = context;
}

public List<String> getAllTablesFromPinot() throws IOException {
Expand Down Expand Up @@ -183,6 +193,46 @@ public JsonNode getTableConfigFromPinotEndpoint(final String dataset) throws IOE
return tableJson;
}

public void updateTableMaxQPSQuota(final String dataset, final JsonNode tableJson) throws IOException {
final Integer customMaxQPSQuota = context.getQuotasConfiguration().getPinotMaxQPSQuotaOverride();
if (customMaxQPSQuota == null || customMaxQPSQuota <= 0) {
return;
}

// update quota if it exists
final JsonNode quotaJson = tableJson.get(TABLE_CONFIG_QUOTA_KEY);
if (quotaJson != null) {
((ObjectNode) quotaJson).put(TABLE_CONFIG_QUOTA_MAX_QPS_KEY, Integer.toString(customMaxQPSQuota));
} else {
LOG.error("quota not configured for dataset {} while onboarding. skipping max qps override", dataset);
return;
}

// update table config with updated quota
final HttpPut request = new HttpPut(String.format(PINOT_TABLES_ENDPOINT_TEMPLATE, dataset));
request.setEntity(new StringEntity(tableJson.toString()));

CloseableHttpResponse response = null;
try {
response = pinotControllerRestClientSupplier.get().execute(pinotControllerHost, request);
if (response.getStatusLine().getStatusCode() != 200) {
throw new IllegalStateException(response.getStatusLine().toString());
}
} catch (final Exception e) {
LOG.error("Exception in updating table config of dataset {}", dataset, e);
throw e;
} finally {
if (response != null) {
if (response.getEntity() != null) {
EntityUtils.consume(response.getEntity());
}
response.close();
}
}

return ;
}

/**
* Returns the map of custom configs of the given dataset from the Pinot table config json.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ protected void configure() {
configuration.getRcaConfiguration(),
configuration.getUiConfiguration(),
configuration.getTimeConfiguration(),
configuration.getNamespaceConfiguration()));
configuration.getNamespaceConfiguration(),
configuration.getQuotasConfiguration()));
install(new ThirdEyeNotificationModule(configuration.getNotificationConfiguration()));
install(new ThirdEyeDetectionPipelineModule(configuration.getDetectionPipelineConfiguration()));
install(new ThirdEyeWorkerModule(configuration.getTaskDriverConfiguration()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@
import ai.startree.thirdeye.rootcause.configuration.RcaConfiguration;
import ai.startree.thirdeye.scheduler.ThirdEyeSchedulerConfiguration;
import ai.startree.thirdeye.scheduler.events.MockEventsConfiguration;
import ai.startree.thirdeye.spi.api.NamespaceConfigurationApi;
import ai.startree.thirdeye.spi.api.TemplateConfigurationApi;
import ai.startree.thirdeye.spi.api.TimeConfigurationApi;
import ai.startree.thirdeye.spi.config.QuotasConfiguration;
import ai.startree.thirdeye.spi.config.TimeConfiguration;
import ai.startree.thirdeye.spi.datalayer.dto.NamespaceConfigurationDTO;
import ai.startree.thirdeye.worker.task.TaskDriverConfiguration;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import freemarker.core.TemplateConfiguration;
import io.dropwizard.core.Configuration;
import io.federecio.dropwizard.swagger.SwaggerBundleConfiguration;
import java.util.List;
Expand Down Expand Up @@ -84,6 +81,9 @@ public class ThirdEyeServerConfiguration extends Configuration {
@JsonProperty("accessControl")
private AccessControlConfiguration accessControlConfiguration = new AccessControlConfiguration();

@JsonProperty("quotas")
private QuotasConfiguration quotasConfiguration = new QuotasConfiguration();

private String phantomJsPath = "";
private String failureFromAddress;
private String failureToAddress;
Expand Down Expand Up @@ -283,4 +283,14 @@ public ThirdEyeServerConfiguration setNamespaceConfiguration(
this.namespaceConfiguration = namespaceConfiguration;
return this;
}

public QuotasConfiguration getQuotasConfiguration() {
return quotasConfiguration;
}

public ThirdEyeServerConfiguration setQuotasConfiguration(
final QuotasConfiguration quotasConfiguration) {
this.quotasConfiguration = quotasConfiguration;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 StarTree Inc
*
* Licensed under the StarTree Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at http://www.startree.ai/legal/startree-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OF ANY KIND,
* either express or implied.
* See the License for the specific language governing permissions and limitations under
* the License.
*/
package ai.startree.thirdeye.spi.config;

import org.checkerframework.checker.nullness.qual.Nullable;

public class QuotasConfiguration {

@Nullable
private Integer pinotMaxQPSQuotaOverride;

public @Nullable Integer getPinotMaxQPSQuotaOverride() {
return pinotMaxQPSQuotaOverride;
}

public QuotasConfiguration setPinotMaxQPSQuotaOverride(final @Nullable Integer pinotMaxQPSQuotaOverride) {
this.pinotMaxQPSQuotaOverride = pinotMaxQPSQuotaOverride;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,21 @@ default List<DatasetConfigDTO> getDatasets() {
throw new UnsupportedOperationException();
}

/**
* Prepares dataset for onboarding
* executes the steps needed before persisting a dataset
*/
default void prepareDatasetForOnboarding(final String datasetName) {
throw new UnsupportedOperationException();
}

/**
* Fetch metadata about the dataset.
*
* @param datasetName name of the table
* @return ThirdEye dataset describing available dimensions and metrics.
*/
default DatasetConfigDTO getDataset(String datasetName) {
default DatasetConfigDTO getDataset(final String datasetName) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
*/
package ai.startree.thirdeye.spi.datasource;

import ai.startree.thirdeye.spi.config.QuotasConfiguration;
import ai.startree.thirdeye.spi.datalayer.dto.DataSourceDTO;

public class ThirdEyeDataSourceContext {

private DataSourceDTO dataSourceDTO;
private QuotasConfiguration quotasConfiguration;

public DataSourceDTO getDataSourceDTO() {
return dataSourceDTO;
Expand All @@ -28,4 +30,14 @@ public ThirdEyeDataSourceContext setDataSourceDTO(
this.dataSourceDTO = dataSourceDTO;
return this;
}

public QuotasConfiguration getQuotasConfiguration() {
return quotasConfiguration;
}

public ThirdEyeDataSourceContext setQuotasConfiguration(
final QuotasConfiguration quotasConfiguration) {
this.quotasConfiguration = quotasConfiguration;
return this;
}
}

0 comments on commit afeb421

Please sign in to comment.