Skip to content

Commit

Permalink
[server] Handle PinotClientException in exception handler (#1714)
Browse files Browse the repository at this point in the history
[pinot] Handle Pinot client exceptions properly

Co-authored-by: Anshul Singh <anshul.singh@anshuls-macbook-pro-1.wyvern-sun.ts.net>
  • Loading branch information
anshul98ks123 and Anshul Singh authored Dec 12, 2024
1 parent 987ce90 commit d393d4a
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
*/
package ai.startree.thirdeye.plugins.datasource.pinot;

import static ai.startree.thirdeye.spi.ThirdEyeStatus.ERR_PINOT_QUERY_EXECUTION;
import static ai.startree.thirdeye.spi.ThirdEyeStatus.ERR_PINOT_QUERY_QUOTA_EXCEEDED;

import ai.startree.thirdeye.plugins.datasource.pinot.resultset.ThirdEyeDataFrameResultSet;
import ai.startree.thirdeye.plugins.datasource.pinot.resultset.ThirdEyeResultSet;
import ai.startree.thirdeye.plugins.datasource.pinot.resultset.ThirdEyeResultSetGroup;
import ai.startree.thirdeye.plugins.datasource.pinot.resultset.ThirdEyeResultSetMetaData;
import ai.startree.thirdeye.spi.ThirdEyeException;
import ai.startree.thirdeye.spi.dataframe.DataFrame;
import ai.startree.thirdeye.spi.detection.v2.ColumnType;
import ai.startree.thirdeye.spi.detection.v2.ColumnType.ColumnDataType;
Expand All @@ -41,6 +45,7 @@
public class PinotQueryExecutor extends CacheLoader<PinotQuery, ThirdEyeResultSetGroup> {

private static final Logger LOG = LoggerFactory.getLogger(PinotQueryExecutor.class);
private static final String QUOTA_EXCEEDED_ERR = "QuotaExceededError";

private final PinotConnectionProvider pinotConnectionProvider;

Expand Down Expand Up @@ -215,8 +220,11 @@ public ThirdEyeResultSetGroup load(final PinotQuery pinotQuery) {

return toThirdEyeResultSetGroup(resultSetGroup);
} catch (final PinotClientException cause) {
LOG.error("Error when running SQL:" + queryWithOptions, cause);
throw new PinotClientException("Error when running SQL:" + queryWithOptions, cause);
LOG.error("Error when running SQL" + queryWithOptions, cause);
if (cause.toString().toUpperCase().contains(QUOTA_EXCEEDED_ERR.toUpperCase())) {
throw new ThirdEyeException(cause, ERR_PINOT_QUERY_QUOTA_EXCEEDED, cause.toString(), queryWithOptions);
}
throw new ThirdEyeException(cause, ERR_PINOT_QUERY_EXECUTION, cause.toString(), queryWithOptions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import ai.startree.thirdeye.plugins.datasource.pinot.resultset.ThirdEyeResultSet;
import ai.startree.thirdeye.plugins.datasource.pinot.resultset.ThirdEyeResultSetGroup;
import ai.startree.thirdeye.spi.Constants;
import ai.startree.thirdeye.spi.ThirdEyeException;
import ai.startree.thirdeye.spi.datalayer.dto.DataSourceDTO;
import ai.startree.thirdeye.spi.datalayer.dto.DatasetConfigDTO;
import ai.startree.thirdeye.spi.datasource.DataSourceRequest;
Expand All @@ -31,6 +32,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics;
Expand Down Expand Up @@ -157,7 +159,8 @@ public String getName() {
* @throws ExecutionException is thrown if failed to connect to Pinot or gets results from
* Pinot.
*/
private ThirdEyeResultSetGroup executeSQL(final PinotQuery pinotQuery) throws ExecutionException {
private ThirdEyeResultSetGroup executeSQL(final PinotQuery pinotQuery) throws ExecutionException,
ThirdEyeException {
try {
final ThirdEyeResultSetGroup thirdEyeResultSetGroup = queryCache.get(pinotQuery);
final long current = System.currentTimeMillis();
Expand All @@ -173,6 +176,16 @@ private ThirdEyeResultSetGroup executeSQL(final PinotQuery pinotQuery) throws Ex
pinotQuery.getOptions(), e);
LOG.error("queryCache.stats: {}", queryCache.stats());
throw e;
} catch (UncheckedExecutionException e) {
LOG.error("Failed to execute SQL: {} with options {}", pinotQuery.getQuery(),
pinotQuery.getOptions(), e);
LOG.error("queryCache.stats: {}", queryCache.stats());
Throwable cause = e.getCause();
if (cause instanceof ThirdEyeException) {
throw (ThirdEyeException) cause;
} else {
throw new ExecutionException(e.getMessage(), e);
}
}
}

Expand All @@ -195,14 +208,14 @@ public DataTable fetchDataTable(final DataSourceRequest request) throws Exceptio
public boolean validate() {
try {
return validate0();
} catch (final ExecutionException | IOException | ArrayIndexOutOfBoundsException e) {
} catch (final ExecutionException | IOException | ArrayIndexOutOfBoundsException | ThirdEyeException e) {
LOG.error("Exception while performing pinot datasource validation.", e);
}
return false;
}

// todo cyril healthcheck should be abstracted by the controller
private boolean validate0() throws IOException, ExecutionException {
private boolean validate0() throws IOException, ExecutionException, ThirdEyeException {
final PinotHealthCheckConfiguration healthCheck = config.getHealthCheck();
if (healthCheck == null || !healthCheck.isEnabled()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static ai.startree.thirdeye.ResourceUtils.statusListApi;
import static ai.startree.thirdeye.spi.ThirdEyeStatus.ERR_ALERT_PIPELINE_EXECUTION;
import static ai.startree.thirdeye.spi.ThirdEyeStatus.ERR_DATA_UNAVAILABLE;
import static ai.startree.thirdeye.spi.ThirdEyeStatus.ERR_EXECUTION_RCA_ALGORITHM;
import static ai.startree.thirdeye.spi.ThirdEyeStatus.ERR_TIMEOUT;
import static ai.startree.thirdeye.spi.ThirdEyeStatus.ERR_UNKNOWN;
import static ai.startree.thirdeye.spi.ThirdEyeStatus.ERR_UNKNOWN_RCA_ALGORITHM;
Expand Down Expand Up @@ -58,6 +59,16 @@ public class ExceptionHandler {

private static final Map<Class<?>, Function<Throwable, StatusApi>> RCA_HANDLERS = ImmutableMap.<Class<?>, Function<Throwable, StatusApi>>builder()
.put(TimeoutException.class, e -> statusApi(ERR_TIMEOUT))
.put(ExecutionException.class,
e -> {
Throwable cause = e.getCause();
if (cause instanceof final ThirdEyeException thirdEyeException) {
return new StatusApi().setCode(thirdEyeException.getStatus())
.setMsg(thirdEyeException.getMessage());
} else {
return statusApi(ERR_EXECUTION_RCA_ALGORITHM, e.getCause().getMessage());
}
})
.put(ThirdEyeException.class,
e -> {
final ThirdEyeException thirdEyeException = (ThirdEyeException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public enum ThirdEyeStatus {
ERR_TIMEOUT("Operation timed out!"),
ERR_UNEXPECTED_QUERY_PARAM("Unexpected Query Param. Allowed values: %s"),
ERR_UNKNOWN("%s"),
ERR_EXECUTION_RCA_ALGORITHM("RCA Algorithm Execution Failed. Error: %s"),
ERR_UNKNOWN_RCA_ALGORITHM("Unknown error running the rca algorithm: %s"),
ERR_CALCITE_FILTERING("Failed running Calcite filtering query with filter: %s"),
ERR_INVALID_PARAMS_COMPONENTS("Invalid param components: %s for Class %s"),
Expand All @@ -61,6 +62,8 @@ public enum ThirdEyeStatus {
ERR_NEGATIVE_LIMIT_VALUE("Negative 'limit' value provided."),
ERR_NEGATIVE_OFFSET_VALUE("Negative 'offset' value provided."),
ERR_OFFSET_WITHOUT_LIMIT("'offset' value provided without 'limit' value."),
ERR_PINOT_QUERY_QUOTA_EXCEEDED("Pinot Query Quota Exceeded. Error: %s. SQL: %s"),
ERR_PINOT_QUERY_EXECUTION("Pinot Query Execution Failed. Error: %s. SQL: %s"),

OK("OK"),
;
Expand Down

0 comments on commit d393d4a

Please sign in to comment.