Skip to content

Commit

Permalink
use thrift with fast service
Browse files Browse the repository at this point in the history
additional fix
  • Loading branch information
Yingjian Wu authored and stevie9868 committed Nov 7, 2024
1 parent be80222 commit ee79858
Show file tree
Hide file tree
Showing 17 changed files with 550 additions and 663 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
*/
package com.netflix.metacat.common.type;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -41,7 +41,7 @@ public final class TypeRegistry implements TypeManager {
* Constructor.
*/
private TypeRegistry() {
Preconditions.checkNotNull(types, "types is null");
Objects.requireNonNull(types, "types is null");
addType(BaseType.UNKNOWN);
addType(BaseType.BIGINT);
addType(BaseType.BOOLEAN);
Expand Down Expand Up @@ -80,7 +80,7 @@ public static TypeRegistry getTypeRegistry() {
* @param type parameter
*/
public static void verifyTypeClass(final Type type) {
Preconditions.checkNotNull(type, "type is null");
Objects.requireNonNull(type, "types is null");
}

/**
Expand Down Expand Up @@ -121,9 +121,11 @@ private Type instantiateParametricType(final TypeSignature signature) {
}
final Type instantiatedType = parametricType.createType(parameterTypes.build(),
signature.getLiteralParameters());
Preconditions.checkState(instantiatedType.getTypeSignature().equals(signature),
"Instantiated parametric type name (%s) does not match expected name (%s)",
instantiatedType, signature);
if (!instantiatedType.getTypeSignature().equals(signature)) {
throw new IllegalStateException(String.format(
"Instantiated parametric type name (%s) does not match expected name (%s)",
instantiatedType, signature));
}
return instantiatedType;
}

Expand All @@ -135,8 +137,9 @@ private Type instantiateParametricType(final TypeSignature signature) {
public void addType(final Type type) {
verifyTypeClass(type);
final Type existingType = types.putIfAbsent(type.getTypeSignature(), type);
Preconditions.checkState(existingType == null
|| existingType.equals(type), "Type %s is already registered", type);
if (!(existingType == null || existingType.equals(type))) {
throw new IllegalStateException(String.format("Type %s is already registered", type));
}
}

/**
Expand All @@ -146,8 +149,9 @@ public void addType(final Type type) {
*/
public void addParametricType(final ParametricType parametricType) {
final TypeEnum baseType = parametricType.getBaseType();
Preconditions.checkArgument(!parametricTypes.containsKey(baseType),
"Parametric type already registered: %s", baseType);
if (parametricTypes.containsKey(baseType)) {
throw new IllegalArgumentException(String.format("Parametric type already registered: %s", baseType));
}
parametricTypes.putIfAbsent(baseType, parametricType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
//import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -69,9 +69,10 @@ public TypeSignature(
) {
if (literalParameters != null) {
for (final Object literal : literalParameters) {
Preconditions.checkArgument(
literal instanceof String || literal instanceof Long,
"Unsupported literal type: %s", literal.getClass());
if (!(literal instanceof String || literal instanceof Long)) {
throw new IllegalArgumentException(
String.format("Unsupported literal type: %s", literal.getClass()));
}
}
this.literalParameters = ImmutableList.copyOf(literalParameters);
} else {
Expand Down Expand Up @@ -119,17 +120,28 @@ public static TypeSignature parseTypeSignature(final String signature) {
final char c = signature.charAt(i);
if (c == '<') {
if (bracketCount == 0) {
Preconditions.checkArgument(baseName == null, "Expected baseName to be null");
Preconditions.checkArgument(parameterStart == -1, "Expected parameter start to be -1");
if (baseName != null) {
throw new IllegalArgumentException("Expected baseName to be null");
}

if (parameterStart != -1) {
throw new IllegalArgumentException("Expected parameter start to be -1");
}

baseName = signature.substring(0, i);
parameterStart = i + 1;
}
bracketCount++;
} else if (c == '>') {
bracketCount--;
Preconditions.checkArgument(bracketCount >= 0, "Bad type signature: '%s'", signature);
if (bracketCount < 0) {
throw new IllegalArgumentException(String.format("Bad type signature: '%s'", signature));
}

if (bracketCount == 0) {
Preconditions.checkArgument(parameterStart >= 0, "Bad type signature: '%s'", signature);
if (parameterStart < 0) {
throw new IllegalArgumentException(String.format("Bad type signature: '%s'", signature));
}
parameters.add(parseTypeSignature(signature.substring(parameterStart, i)));
parameterStart = i + 1;
if (i == signature.length() - 1) {
Expand All @@ -138,31 +150,55 @@ public static TypeSignature parseTypeSignature(final String signature) {
}
} else if (c == ',') {
if (bracketCount == 1 && !inLiteralParameters) {
Preconditions.checkArgument(parameterStart >= 0, "Bad type signature: '%s'", signature);
if (parameterStart < 0) {
throw new IllegalArgumentException(String.format("Bad type signature: '%s'", signature));
}

parameters.add(parseTypeSignature(signature.substring(parameterStart, i)));
parameterStart = i + 1;
} else if (bracketCount == 0 && inLiteralParameters) {
Preconditions.checkArgument(parameterStart >= 0, "Bad type signature: '%s'", signature);
if (parameterStart < 0) {
throw new IllegalArgumentException(String.format("Bad type signature: '%s'", signature));
}

literalParameters.add(parseLiteral(signature.substring(parameterStart, i)));
parameterStart = i + 1;
}
} else if (c == '(') {
Preconditions.checkArgument(!inLiteralParameters, "Bad type signature: '%s'", signature);
if (inLiteralParameters) {
throw new IllegalArgumentException(String.format("Bad type signature: '%s'", signature));
}

inLiteralParameters = true;
if (bracketCount == 0) {
if (baseName == null) {
Preconditions.checkArgument(parameters.isEmpty(), "Expected no parameters");
Preconditions.checkArgument(parameterStart == -1, "Expected parameter start to be -1");
if (!parameters.isEmpty()) {
throw new IllegalArgumentException("Expected no parameters");
}

if (parameterStart != -1) {
throw new IllegalArgumentException("Expected parameter start to be -1");
}

baseName = signature.substring(0, i);
}
parameterStart = i + 1;
}
} else if (c == ')') {
Preconditions.checkArgument(inLiteralParameters, "Bad type signature: '%s'", signature);
if (!inLiteralParameters) {
throw new IllegalArgumentException(String.format("Bad type signature: '%s'", signature));
}

inLiteralParameters = false;
if (bracketCount == 0) {
Preconditions.checkArgument(i == signature.length() - 1, "Bad type signature: '%s'", signature);
Preconditions.checkArgument(parameterStart >= 0, "Bad type signature: '%s'", signature);
if (i != signature.length() - 1) {
throw new IllegalArgumentException(String.format("Bad type signature: '%s'", signature));
}

if (parameterStart < 0) {
throw new IllegalArgumentException(String.format("Bad type signature: '%s'", signature));
}

literalParameters.add(parseLiteral(signature.substring(parameterStart, i)));
return new TypeSignature(baseName, parameters, literalParameters);
}
Expand All @@ -173,7 +209,9 @@ public static TypeSignature parseTypeSignature(final String signature) {

private static Object parseLiteral(final String literal) {
if (literal.startsWith("'") || literal.endsWith("'")) {
Preconditions.checkArgument(literal.startsWith("'") && literal.endsWith("'"), "Bad literal: '%s'", literal);
if (!(literal.startsWith("'") && literal.endsWith("'"))) {
throw new IllegalArgumentException(String.format("Bad literal: '%s'", literal));
}
return literal.substring(1, literal.length() - 1);
} else {
return Long.parseLong(literal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class HiveConnectorFactory extends SpringConnectorFactory {
connectorContext.getConfiguration()
.getOrDefault(HiveConfigConstants.USE_EMBEDDED_METASTORE, "false")
);
final boolean useFastHiveService = useLocalMetastore && Boolean.parseBoolean(
final boolean useFastHiveService = Boolean.parseBoolean(
connectorContext.getConfiguration()
.getOrDefault(HiveConfigConstants.USE_FASTHIVE_SERVICE, "false")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

package com.netflix.metacat.connector.hive.configs;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.util.DataSourceManager;
import com.netflix.metacat.common.server.util.ThreadServiceManager;
import com.netflix.metacat.connector.hive.HiveConnectorDatabaseService;
import com.netflix.metacat.connector.hive.HiveConnectorPartitionService;
Expand All @@ -25,11 +27,17 @@
import com.netflix.metacat.connector.hive.converters.HiveConnectorInfoConverter;
import com.netflix.metacat.connector.hive.util.HiveConfigConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;
import java.net.URI;
import java.util.concurrent.TimeUnit;

Expand All @@ -42,6 +50,13 @@
@Slf4j
@Configuration
public class HiveConnectorConfig {
/** Default Query timeout in milliseconds. */
private static final int DEFAULT_DATASTORE_TIMEOUT = 60000;
/** Default Query timeout in milliseconds for reads. */
private static final int DEFAULT_DATASTORE_READ_TIMEOUT = 120000;
/** Default Query timeout in milliseconds for writes. */
private static final int DEFAULT_DATASTORE_WRITE_TIMEOUT = 120000;

/**
* create hive connector database service.
*
Expand Down Expand Up @@ -149,4 +164,136 @@ public ThreadServiceManager threadServiceManager(final ConnectorContext connecto
1000,
"hive");
}

/**
* create warehouse for file system calls.
*
* @param connectorContext connector config context
* @return WareHouse
*/
@Bean
public Warehouse warehouse(final ConnectorContext connectorContext) {
try {
final HiveConf conf = this.getDefaultConf(connectorContext);
connectorContext.getConfiguration().forEach(conf::set);
return new Warehouse(conf);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"Failed creating the hive warehouse for catalog: %s",
connectorContext.getCatalogName()
),
e
);
}
}

/**
* hive DataSource.
*
* @param connectorContext connector config.
* @return data source
*/
@Bean
public DataSource hiveDataSource(final ConnectorContext connectorContext) {
final HiveConf conf = this.getDefaultConf(connectorContext);
connectorContext.getConfiguration().forEach(conf::set);
DataSourceManager.get().load(
connectorContext.getCatalogShardName(),
connectorContext.getConfiguration()
);
return DataSourceManager.get().get(connectorContext.getCatalogShardName());
}

/**
* hive metadata Transaction Manager.
*
* @param hiveDataSource hive data source
* @return hive transaction manager
*/
@Bean
public DataSourceTransactionManager hiveTxManager(
@Qualifier("hiveDataSource") final DataSource hiveDataSource) {
return new DataSourceTransactionManager(hiveDataSource);
}

/**
* hive metadata read JDBC template. Query timeout is set to control long running read queries.
*
* @param connectorContext connector config.
* @param hiveDataSource hive data source
* @return hive JDBC Template
*/
@Bean
public JdbcTemplate hiveReadJdbcTemplate(
final ConnectorContext connectorContext,
@Qualifier("hiveDataSource") final DataSource hiveDataSource) {
final JdbcTemplate result = new JdbcTemplate(hiveDataSource);
result.setQueryTimeout(getDataStoreReadTimeout(connectorContext) / 1000);
return result;
}

/**
* hive metadata write JDBC template. Query timeout is set to control long running write queries.
*
* @param connectorContext connector config.
* @param hiveDataSource hive data source
* @return hive JDBC Template
*/
@Bean
public JdbcTemplate hiveWriteJdbcTemplate(
final ConnectorContext connectorContext,
@Qualifier("hiveDataSource") final DataSource hiveDataSource) {
final JdbcTemplate result = new JdbcTemplate(hiveDataSource);
result.setQueryTimeout(getDataStoreWriteTimeout(connectorContext) / 1000);
return result;
}

@VisibleForTesting
private HiveConf getDefaultConf(
final ConnectorContext connectorContext
) {
final HiveConf result = new HiveConf();
result.setBoolean(HiveConfigConstants.USE_METASTORE_LOCAL, true);

final int dataStoreTimeout = getDataStoreTimeout(connectorContext);
result.setInt(HiveConfigConstants.JAVAX_JDO_DATASTORETIMEOUT, dataStoreTimeout);
result.setInt(HiveConfigConstants.JAVAX_JDO_DATASTOREREADTIMEOUT, dataStoreTimeout);
result.setInt(HiveConfigConstants.JAVAX_JDO_DATASTOREWRITETIMEOUT, getDataStoreWriteTimeout(connectorContext));
result.setInt(HiveConfigConstants.HIVE_METASTORE_DS_RETRY, 0);
result.setInt(HiveConfigConstants.HIVE_HMSHANDLER_RETRY, 0);
result.set(
HiveConfigConstants.JAVAX_JDO_PERSISTENCEMANAGER_FACTORY_CLASS,
HiveConfigConstants.JAVAX_JDO_PERSISTENCEMANAGER_FACTORY
);
result.setBoolean(HiveConfigConstants.HIVE_STATS_AUTOGATHER, false);
return result;
}

private int getDataStoreTimeout(final ConnectorContext connectorContext) {
int result = DEFAULT_DATASTORE_TIMEOUT;
try {
result = Integer.parseInt(
connectorContext.getConfiguration().get(HiveConfigConstants.JAVAX_JDO_DATASTORETIMEOUT));
} catch (final Exception ignored) { }
return result;
}

private int getDataStoreReadTimeout(final ConnectorContext connectorContext) {
int result = DEFAULT_DATASTORE_READ_TIMEOUT;
try {
result = Integer.parseInt(
connectorContext.getConfiguration().get(HiveConfigConstants.JAVAX_JDO_DATASTOREREADTIMEOUT));
} catch (final Exception ignored) { }
return result;
}

private int getDataStoreWriteTimeout(final ConnectorContext connectorContext) {
int result = DEFAULT_DATASTORE_WRITE_TIMEOUT;
try {
result = Integer.parseInt(
connectorContext.getConfiguration().get(HiveConfigConstants.JAVAX_JDO_DATASTOREWRITETIMEOUT));
} catch (final Exception ignored) { }
return result;
}
}
Loading

0 comments on commit ee79858

Please sign in to comment.