Skip to content

Commit

Permalink
Async DB Clients
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoxaAntoxic committed Apr 15, 2024
1 parent 35f990b commit bb4763f
Show file tree
Hide file tree
Showing 13 changed files with 489 additions and 254 deletions.
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@
<artifactId>vertx-jdbc-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mysql-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-pg-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-circuit-breaker</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.prebid.server.settings;

import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.exception.PreBidException;
Expand Down Expand Up @@ -125,9 +126,9 @@ public Future<Map<String, String>> getCategories(String primaryAdServer, String
* Note: mapper should never throws exception in case of using
* {@link org.prebid.server.vertx.jdbc.CircuitBreakerSecuredJdbcClient}.
*/
private <T> T mapToModelOrError(ResultSet result, Function<JsonArray, T> mapper) {
return result != null && CollectionUtils.isNotEmpty(result.getResults())
? mapper.apply(result.getResults().get(0))
private <T> T mapToModelOrError(RowSet<Row> rowSet, Function<Row, T> mapper) {
return rowSet != null && rowSet.iterator().hasNext()
? mapper.apply(rowSet.iterator().next())
: null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package org.prebid.server.settings.helper;

import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;
import org.apache.commons.collections4.CollectionUtils;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
import io.vertx.sqlclient.RowSet;
import org.prebid.server.exception.PreBidException;
import org.prebid.server.log.Logger;
import org.prebid.server.log.LoggerFactory;
import org.prebid.server.settings.model.StoredDataResult;
import org.prebid.server.settings.model.StoredDataType;
import org.prebid.server.settings.model.StoredItem;
import org.prebid.server.util.ObjectUtil;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -31,7 +33,7 @@ private JdbcStoredDataResultMapper() {
/**
* Maps {@link ResultSet} to {@link StoredDataResult} and creates an error for each missing ID and add it to result.
*
* @param resultSet - incoming Result Set representing a result of SQL query
* @param rowSet - incoming Row Set representing a result of SQL query
* @param accountId - an account ID extracted from request
* @param requestIds - a specified set of stored requests' IDs. Adds error for each ID missing in result set
* @param impIds - a specified set of stored imps' IDs. Adds error for each ID missing in result set
Expand All @@ -40,13 +42,17 @@ private JdbcStoredDataResultMapper() {
* Note: mapper should never throws exception in case of using
* {@link org.prebid.server.vertx.jdbc.CircuitBreakerSecuredJdbcClient}.
*/
public static StoredDataResult map(ResultSet resultSet, String accountId, Set<String> requestIds,
public static StoredDataResult map(RowSet<Row> rowSet,
String accountId,
Set<String> requestIds,
Set<String> impIds) {
final Map<String, String> storedIdToRequest;
final Map<String, String> storedIdToImp;
final List<String> errors = new ArrayList<>();

if (resultSet == null || CollectionUtils.isEmpty(resultSet.getResults())) {
final RowIterator<Row> rowIterator = rowSet != null ? rowSet.iterator() : null;

if (rowIterator == null || !rowIterator.hasNext()) {
storedIdToRequest = Collections.emptyMap();
storedIdToImp = Collections.emptyMap();

Expand All @@ -64,17 +70,24 @@ public static StoredDataResult map(ResultSet resultSet, String accountId, Set<St
final Map<String, Set<StoredItem>> requestIdToStoredItems = new HashMap<>();
final Map<String, Set<StoredItem>> impIdToStoredItems = new HashMap<>();

for (JsonArray result : resultSet.getResults()) {
while (rowIterator.hasNext()) {
final Row row = rowIterator.next();
if (row.toJson().size() < 4) {
final String message = "Error occurred while mapping stored request data: some columns are missing";
logger.error(message);
errors.add(message);
return StoredDataResult.of(Collections.emptyMap(), Collections.emptyMap(), errors);
}
final String fetchedAccountId;
final String id;
final String data;
final String typeAsString;
try {
fetchedAccountId = result.getString(0);
id = result.getString(1);
data = result.getString(2);
typeAsString = result.getString(3);
} catch (IndexOutOfBoundsException | ClassCastException e) {
fetchedAccountId = row.getString(0);
id = row.getString(1);
data = row.getString(2);
typeAsString = ObjectUtil.getIfNotNull(row.getValue(3), Object::toString);
} catch (ClassCastException e) {
final String message = "Error occurred while mapping stored request data";
logger.error(message, e);
errors.add(message);
Expand Down Expand Up @@ -112,7 +125,7 @@ public static StoredDataResult map(ResultSet resultSet, String accountId, Set<St
* @param resultSet - incoming {@link ResultSet} representing a result of SQL query.
* @return - a {@link StoredDataResult} object.
*/
public static StoredDataResult map(ResultSet resultSet) {
public static StoredDataResult map(RowSet<Row> resultSet) {
return map(resultSet, null, Collections.emptySet(), Collections.emptySet());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.prebid.server.settings.helper;

import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;
import org.apache.commons.collections4.CollectionUtils;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
import io.vertx.sqlclient.RowSet;
import org.prebid.server.settings.model.StoredResponseDataResult;

import java.util.ArrayList;
Expand All @@ -17,26 +17,29 @@ public class JdbcStoredResponseResultMapper {
private JdbcStoredResponseResultMapper() {
}

public static StoredResponseDataResult map(ResultSet resultSet, Set<String> responseIds) {
public static StoredResponseDataResult map(RowSet<Row> rowSet, Set<String> responseIds) {
final Map<String, String> storedIdToResponse = new HashMap<>(responseIds.size());
final List<String> errors = new ArrayList<>();

if (resultSet == null || CollectionUtils.isEmpty(resultSet.getResults())) {
final RowIterator<Row> rowIterator = rowSet != null ? rowSet.iterator() : null;
if (rowIterator == null || !rowIterator.hasNext()) {
handleEmptyResultError(responseIds, errors);
} else {
try {
for (JsonArray result : resultSet.getResults()) {
storedIdToResponse.put(result.getString(0), result.getString(1));
}
} catch (IndexOutOfBoundsException e) {
return StoredResponseDataResult.of(storedIdToResponse, errors);
}

while (rowIterator.hasNext()) {
final Row row = rowIterator.next();
if (row.toJson().size() < 2) {
errors.add("Result set column number is less than expected");
return StoredResponseDataResult.of(Collections.emptyMap(), errors);
}
errors.addAll(responseIds.stream().filter(id -> !storedIdToResponse.containsKey(id))
.map(id -> "No stored response found for id: " + id)
.toList());
storedIdToResponse.put(row.getString(0), row.getString(1));
}

errors.addAll(responseIds.stream().filter(id -> !storedIdToResponse.containsKey(id))
.map(id -> "No stored response found for id: " + id)
.toList());

return StoredResponseDataResult.of(storedIdToResponse, errors);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.mysqlclient.MySQLBuilder;
import io.vertx.pgclient.PgBuilder;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import org.prebid.server.metric.Metrics;
import org.prebid.server.spring.config.database.model.ConnectionPoolSettings;
import org.prebid.server.spring.config.database.model.DatabaseAddress;
Expand All @@ -11,7 +14,6 @@
import org.prebid.server.vertx.ContextRunner;
import org.prebid.server.vertx.jdbc.BasicJdbcClient;
import org.prebid.server.vertx.jdbc.CircuitBreakerSecuredJdbcClient;
import org.prebid.server.vertx.jdbc.JdbcClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -81,31 +83,69 @@ ConnectionPoolSettings connectionPoolSettings(DatabaseConfigurationProperties da
}

@Bean
JDBCClient vertxJdbcClient(Vertx vertx,
DatabaseAddress databaseAddress,
ConnectionPoolSettings connectionPoolSettings,
DatabaseUrlFactory urlFactory,
ConnectionPoolConfigurationFactory configurationFactory) {
@ConditionalOnProperty(name = "settings.database.type", havingValue = "mysql")
Pool mysqlConnectionPool(Vertx vertx,
DatabaseAddress databaseAddress,
ConnectionPoolSettings connectionPoolSettings,
DatabaseUrlFactory urlFactory,
ConnectionPoolConfigurationFactory configurationFactory) {

final String databaseUrl = urlFactory.createUrl(
databaseAddress.getHost(), databaseAddress.getPort(), databaseAddress.getDatabaseName());
databaseAddress.getHost(),
databaseAddress.getPort(),
databaseAddress.getDatabaseName());

final JsonObject configurationProperties = prepareDatabaseConfigurationProperties(
databaseUrl,
connectionPoolSettings,
configurationFactory);

return MySQLBuilder
.pool()
.with(new PoolOptions(configurationProperties))
.connectingTo(databaseUrl)
.using(vertx)
.build();
}

final JsonObject connectionPoolConfigurationProperties = configurationFactory.create(
databaseUrl, connectionPoolSettings);
final JsonObject databaseConfigurationProperties = new JsonObject()
.put("driver_class", connectionPoolSettings.getDatabaseType().jdbcDriver);
databaseConfigurationProperties.mergeIn(connectionPoolConfigurationProperties);
@Bean
@ConditionalOnProperty(name = "settings.database.type", havingValue = "postgres")
Pool postgresConnectionPool(Vertx vertx,
DatabaseAddress databaseAddress,
ConnectionPoolSettings connectionPoolSettings,
DatabaseUrlFactory urlFactory,
ConnectionPoolConfigurationFactory configurationFactory) {

return JDBCClient.createShared(vertx, databaseConfigurationProperties);
final String databaseUrl = urlFactory.createUrl(
databaseAddress.getHost(),
databaseAddress.getPort(),
databaseAddress.getDatabaseName());

final JsonObject configurationProperties = prepareDatabaseConfigurationProperties(
databaseUrl,
connectionPoolSettings,
configurationFactory);

return PgBuilder
.pool()
.with(new PoolOptions(configurationProperties))
.connectingTo(databaseUrl)
.using(vertx)
.build();
}

@Bean
@ConditionalOnProperty(prefix = "settings.database.circuit-breaker", name = "enabled", havingValue = "false",
matchIfMissing = true)
BasicJdbcClient basicJdbcClient(
Vertx vertx, JDBCClient vertxJdbcClient, Metrics metrics, Clock clock, ContextRunner contextRunner) {
private JsonObject prepareDatabaseConfigurationProperties(String databaseUrl,
ConnectionPoolSettings connectionPoolSettings,
ConnectionPoolConfigurationFactory configurationFactory) {

return createBasicJdbcClient(vertx, vertxJdbcClient, metrics, clock, contextRunner);
final JsonObject connectionPoolConfigurationProperties = configurationFactory.create(
databaseUrl,
connectionPoolSettings);
final JsonObject databaseConfigurationProperties = new JsonObject()
.put("driver_class", connectionPoolSettings.getDatabaseType().jdbcDriver);

databaseConfigurationProperties.mergeIn(connectionPoolConfigurationProperties);
return databaseConfigurationProperties;
}

@Bean
Expand All @@ -115,29 +155,37 @@ CircuitBreakerProperties databaseCircuitBreakerProperties() {
return new CircuitBreakerProperties();
}

@Bean
@ConditionalOnProperty(prefix = "settings.database.circuit-breaker", name = "enabled", havingValue = "false",
matchIfMissing = true)
BasicJdbcClient basicJdbcClient(Vertx vertx,
Pool pool,
Metrics metrics,
Clock clock,
ContextRunner contextRunner) {

return createBasicJdbcClient(vertx, pool, metrics, clock, contextRunner);
}

@Bean
@ConditionalOnProperty(prefix = "settings.database.circuit-breaker", name = "enabled", havingValue = "true")
CircuitBreakerSecuredJdbcClient circuitBreakerSecuredJdbcClient(
CircuitBreakerSecuredJdbcClient circuitBreakerSecuredAsyncDatabaseClient(
Vertx vertx,
JDBCClient vertxJdbcClient,
Pool pool,
Metrics metrics,
Clock clock,
ContextRunner contextRunner,
@Qualifier("databaseCircuitBreakerProperties") CircuitBreakerProperties circuitBreakerProperties) {

final JdbcClient jdbcClient = createBasicJdbcClient(vertx, vertxJdbcClient, metrics, clock, contextRunner);
return new CircuitBreakerSecuredJdbcClient(vertx, jdbcClient, metrics,
circuitBreakerProperties.getOpeningThreshold(), circuitBreakerProperties.getOpeningIntervalMs(),
circuitBreakerProperties.getClosingIntervalMs(), clock);
}

private static BasicJdbcClient createBasicJdbcClient(
Vertx vertx, JDBCClient vertxJdbcClient, Metrics metrics, Clock clock, ContextRunner contextRunner) {
final BasicJdbcClient basicJdbcClient = new BasicJdbcClient(vertx, vertxJdbcClient, metrics, clock);

contextRunner.<Void>runBlocking(promise -> basicJdbcClient.initialize().onComplete(promise));

return basicJdbcClient;
final BasicJdbcClient jdbcClient = createBasicJdbcClient(vertx, pool, metrics, clock, contextRunner);
return new CircuitBreakerSecuredJdbcClient(
vertx,
jdbcClient,
metrics,
circuitBreakerProperties.getOpeningThreshold(),
circuitBreakerProperties.getOpeningIntervalMs(),
circuitBreakerProperties.getClosingIntervalMs(),
clock);
}

@Bean
Expand All @@ -146,4 +194,17 @@ private static BasicJdbcClient createBasicJdbcClient(
public DatabaseConfigurationProperties databaseConfigurationProperties() {
return new DatabaseConfigurationProperties();
}

private static BasicJdbcClient createBasicJdbcClient(Vertx vertx,
Pool pool,
Metrics metrics,
Clock clock,
ContextRunner contextRunner) {

final BasicJdbcClient basicJdbcClient = new BasicJdbcClient(vertx, pool, metrics, clock);

contextRunner.<Void>runBlocking(promise -> basicJdbcClient.initialize().onComplete(promise));

return basicJdbcClient;
}
}
Loading

0 comments on commit bb4763f

Please sign in to comment.