Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async DB Clients #3118

Merged
merged 8 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<artifactId>vertx-mysql-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-pg-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.prebid.server.health;

import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.sqlclient.Pool;
import org.prebid.server.health.model.Status;
import org.prebid.server.health.model.StatusResponse;

Expand All @@ -15,13 +13,13 @@ public class DatabaseHealthChecker extends PeriodicHealthChecker {

private static final String NAME = "database";

private final JDBCClient jdbcClient;
private final Pool pool;

private StatusResponse status;

public DatabaseHealthChecker(Vertx vertx, JDBCClient jdbcClient, long refreshPeriod) {
public DatabaseHealthChecker(Vertx vertx, Pool pool, long refreshPeriod) {
super(vertx, refreshPeriod);
this.jdbcClient = Objects.requireNonNull(jdbcClient);
this.pool = Objects.requireNonNull(pool);
}

@Override
Expand All @@ -36,9 +34,7 @@ public String name() {

@Override
void updateStatus() {
final Promise<SQLConnection> connectionPromise = Promise.promise();
jdbcClient.getConnection(connectionPromise);
connectionPromise.future().onComplete(result ->
pool.getConnection().onComplete(result ->
status = StatusResponse.of(
result.succeeded() ? Status.UP.name() : Status.DOWN.name(),
ZonedDateTime.now(Clock.systemUTC())));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.prebid.server.settings;

import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
import io.vertx.sqlclient.RowSet;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.exception.PreBidException;
Expand All @@ -11,9 +12,11 @@
import org.prebid.server.json.JacksonMapper;
import org.prebid.server.settings.helper.JdbcStoredDataResultMapper;
import org.prebid.server.settings.helper.JdbcStoredResponseResultMapper;
import org.prebid.server.settings.helper.ParametrizedQueryHelper;
import org.prebid.server.settings.model.Account;
import org.prebid.server.settings.model.StoredDataResult;
import org.prebid.server.settings.model.StoredResponseDataResult;
import org.prebid.server.util.ObjectUtil;
import org.prebid.server.vertx.jdbc.JdbcClient;

import java.util.ArrayList;
Expand All @@ -23,7 +26,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
Expand All @@ -36,14 +38,9 @@
*/
public class JdbcApplicationSettings implements ApplicationSettings {

private static final String ACCOUNT_ID_PLACEHOLDER = "%ACCOUNT_ID%";
private static final String REQUEST_ID_PLACEHOLDER = "%REQUEST_ID_LIST%";
private static final String IMP_ID_PLACEHOLDER = "%IMP_ID_LIST%";
private static final String RESPONSE_ID_PLACEHOLDER = "%RESPONSE_ID_LIST%";
private static final String QUERY_PARAM_PLACEHOLDER = "?";

private final JdbcClient jdbcClient;
private final JacksonMapper mapper;
private final ParametrizedQueryHelper parametrizedQueryHelper;

/**
* Query to select account by ids.
Expand Down Expand Up @@ -86,15 +83,17 @@ public class JdbcApplicationSettings implements ApplicationSettings {

public JdbcApplicationSettings(JdbcClient jdbcClient,
JacksonMapper mapper,
ParametrizedQueryHelper parametrizedQueryHelper,
String selectAccountQuery,
String selectStoredRequestsQuery,
String selectAmpStoredRequestsQuery,
String selectStoredResponsesQuery) {

this.jdbcClient = Objects.requireNonNull(jdbcClient);
this.mapper = Objects.requireNonNull(mapper);
this.selectAccountQuery = Objects.requireNonNull(selectAccountQuery)
.replace(ACCOUNT_ID_PLACEHOLDER, QUERY_PARAM_PLACEHOLDER);
this.parametrizedQueryHelper = Objects.requireNonNull(parametrizedQueryHelper);
this.selectAccountQuery = parametrizedQueryHelper.replaceAccountIdPlaceholder(
Objects.requireNonNull(selectAccountQuery));
this.selectStoredRequestsQuery = Objects.requireNonNull(selectStoredRequestsQuery);
this.selectAmpStoredRequestsQuery = Objects.requireNonNull(selectAmpStoredRequestsQuery);
this.selectStoredResponsesQuery = Objects.requireNonNull(selectStoredResponsesQuery);
Expand All @@ -109,7 +108,7 @@ public Future<Account> getAccountById(String accountId, Timeout timeout) {
return jdbcClient.executeQuery(
selectAccountQuery,
Collections.singletonList(accountId),
result -> mapToModelOrError(result, row -> toAccount(row.getString(0))),
result -> mapToModelOrError(result, this::toAccount),
timeout)
.compose(result -> failedIfNull(result, accountId, "Account"));
}
Expand All @@ -120,14 +119,15 @@ public Future<Map<String, String>> getCategories(String primaryAdServer, String
}

/**
* Transforms the first row of {@link ResultSet} to required object or returns null.
* Transforms the first row of {@link RowSet<Row>} to required object or returns null.
* <p>
* Note: mapper should never throws exception in case of using
* {@link org.prebid.server.vertx.jdbc.CircuitBreakerSecuredJdbcClient}.
*/
private <T> T mapToModelOrError(ResultSet result, Function<JsonArray, T> mapper) {
return result != null && CollectionUtils.isNotEmpty(result.getResults())
? mapper.apply(result.getResults().get(0))
private <T> T mapToModelOrError(RowSet<Row> rowSet, Function<Row, T> mapper) {
final RowIterator<Row> rowIterator = rowSet != null ? rowSet.iterator() : null;
return rowIterator != null && rowIterator.hasNext()
? mapper.apply(rowIterator.next())
: null;
}

Expand All @@ -141,7 +141,8 @@ private static <T> Future<T> failedIfNull(T value, String id, String errorPrefix
: Future.failedFuture(new PreBidException("%s not found: %s".formatted(errorPrefix, id)));
}

private Account toAccount(String source) {
private Account toAccount(Row row) {
final String source = ObjectUtil.getIfNotNull(row.getValue(0), Object::toString);
try {
return source != null ? mapper.decodeValue(source, Account.class) : null;
} catch (DecodeException e) {
Expand Down Expand Up @@ -185,11 +186,15 @@ public Future<StoredDataResult> getVideoStoredData(String accountId, Set<String>
*/
@Override
public Future<StoredResponseDataResult> getStoredResponses(Set<String> responseIds, Timeout timeout) {
final String queryResolvedWithParameters = selectStoredResponsesQuery.replaceAll(RESPONSE_ID_PLACEHOLDER,
parameterHolders(responseIds.size()));
final String queryResolvedWithParameters = parametrizedQueryHelper.replaceStoredResponseIdPlaceholders(
selectStoredResponsesQuery,
responseIds.size());

final List<Object> idsQueryParameters = new ArrayList<>();
IntStream.rangeClosed(1, StringUtils.countMatches(selectStoredResponsesQuery, RESPONSE_ID_PLACEHOLDER))
final int responseIdPlaceholderCount = StringUtils.countMatches(
selectStoredResponsesQuery,
ParametrizedQueryHelper.RESPONSE_ID_PLACEHOLDER);
IntStream.rangeClosed(1, responseIdPlaceholderCount)
.forEach(i -> idsQueryParameters.addAll(responseIds));

return jdbcClient.executeQuery(queryResolvedWithParameters, idsQueryParameters,
Expand All @@ -208,38 +213,21 @@ private Future<StoredDataResult> fetchStoredData(String query, String accountId,
StoredDataResult.of(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList()));
} else {
final List<Object> idsQueryParameters = new ArrayList<>();
IntStream.rangeClosed(1, StringUtils.countMatches(query, REQUEST_ID_PLACEHOLDER))
IntStream.rangeClosed(1, StringUtils.countMatches(query, ParametrizedQueryHelper.REQUEST_ID_PLACEHOLDER))
.forEach(i -> idsQueryParameters.addAll(requestIds));
IntStream.rangeClosed(1, StringUtils.countMatches(query, IMP_ID_PLACEHOLDER))
IntStream.rangeClosed(1, StringUtils.countMatches(query, ParametrizedQueryHelper.IMP_ID_PLACEHOLDER))
.forEach(i -> idsQueryParameters.addAll(impIds));

final String parametrizedQuery = createParametrizedQuery(query, requestIds.size(), impIds.size());
final String parametrizedQuery = parametrizedQueryHelper.replaceRequestAndImpIdPlaceholders(
query,
requestIds.size(),
impIds.size());

future = jdbcClient.executeQuery(parametrizedQuery, idsQueryParameters,
result -> JdbcStoredDataResultMapper.map(result, accountId, requestIds, impIds),
timeout);
}

return future;
}

/**
* Creates parametrized query from query and variable templates, by replacing templateVariable
* with appropriate number of "?" placeholders.
*/
private static String createParametrizedQuery(String query, int requestIdsSize, int impIdsSize) {
return query
.replace(REQUEST_ID_PLACEHOLDER, parameterHolders(requestIdsSize))
.replace(IMP_ID_PLACEHOLDER, parameterHolders(impIdsSize));
}

/**
* Returns string for parametrized placeholder.
*/
private static String parameterHolders(int paramsSize) {
return paramsSize == 0
? "NULL"
: IntStream.range(0, paramsSize)
.mapToObj(i -> QUERY_PARAM_PLACEHOLDER)
.collect(Collectors.joining(","));
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package org.prebid.server.settings.helper;

import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;
import org.apache.commons.collections4.CollectionUtils;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
import io.vertx.sqlclient.RowSet;
import org.prebid.server.exception.PreBidException;
import org.prebid.server.log.Logger;
import org.prebid.server.log.LoggerFactory;
import org.prebid.server.settings.model.StoredDataResult;
import org.prebid.server.settings.model.StoredDataType;
import org.prebid.server.settings.model.StoredItem;
import org.prebid.server.util.ObjectUtil;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -19,7 +20,7 @@
import java.util.Set;

/**
* Utility class for mapping {@link ResultSet} to {@link StoredDataResult}.
* Utility class for mapping {@link RowSet<Row>} to {@link StoredDataResult}.
*/
public class JdbcStoredDataResultMapper {

Expand All @@ -29,9 +30,9 @@ private JdbcStoredDataResultMapper() {
}

/**
* Maps {@link ResultSet} to {@link StoredDataResult} and creates an error for each missing ID and add it to result.
* Maps {@link RowSet} to {@link StoredDataResult} and creates an error for each missing ID and add it to result.
*
* @param resultSet - incoming Result Set representing a result of SQL query
* @param rowSet - incoming Row Set representing a result of SQL query
* @param accountId - an account ID extracted from request
* @param requestIds - a specified set of stored requests' IDs. Adds error for each ID missing in result set
* @param impIds - a specified set of stored imps' IDs. Adds error for each ID missing in result set
Expand All @@ -40,13 +41,17 @@ private JdbcStoredDataResultMapper() {
* Note: mapper should never throws exception in case of using
* {@link org.prebid.server.vertx.jdbc.CircuitBreakerSecuredJdbcClient}.
*/
public static StoredDataResult map(ResultSet resultSet, String accountId, Set<String> requestIds,
public static StoredDataResult map(RowSet<Row> rowSet,
String accountId,
Set<String> requestIds,
Set<String> impIds) {
final Map<String, String> storedIdToRequest;
final Map<String, String> storedIdToImp;
final List<String> errors = new ArrayList<>();

if (resultSet == null || CollectionUtils.isEmpty(resultSet.getResults())) {
final RowIterator<Row> rowIterator = rowSet != null ? rowSet.iterator() : null;

if (rowIterator == null || !rowIterator.hasNext()) {
storedIdToRequest = Collections.emptyMap();
storedIdToImp = Collections.emptyMap();

Expand All @@ -64,17 +69,24 @@ public static StoredDataResult map(ResultSet resultSet, String accountId, Set<St
final Map<String, Set<StoredItem>> requestIdToStoredItems = new HashMap<>();
final Map<String, Set<StoredItem>> impIdToStoredItems = new HashMap<>();

for (JsonArray result : resultSet.getResults()) {
while (rowIterator.hasNext()) {
final Row row = rowIterator.next();
if (row.toJson().size() < 4) {
final String message = "Error occurred while mapping stored request data: some columns are missing";
logger.error(message);
errors.add(message);
return StoredDataResult.of(Collections.emptyMap(), Collections.emptyMap(), errors);
}
final String fetchedAccountId;
final String id;
final String data;
final String typeAsString;
try {
fetchedAccountId = result.getString(0);
id = result.getString(1);
data = result.getString(2);
typeAsString = result.getString(3);
} catch (IndexOutOfBoundsException | ClassCastException e) {
fetchedAccountId = row.getString(0);
id = row.getString(1);
data = ObjectUtil.getIfNotNull(row.getValue(2), Object::toString);
typeAsString = ObjectUtil.getIfNotNull(row.getValue(3), Object::toString);
} catch (ClassCastException e) {
final String message = "Error occurred while mapping stored request data";
logger.error(message, e);
errors.add(message);
Expand Down Expand Up @@ -109,10 +121,10 @@ public static StoredDataResult map(ResultSet resultSet, String accountId, Set<St
/**
* Overloaded method for cases when no specific IDs are required, e.g. fetching all records.
*
* @param resultSet - incoming {@link ResultSet} representing a result of SQL query.
* @param resultSet - incoming {@link RowSet<Row>} representing a result of SQL query.
* @return - a {@link StoredDataResult} object.
*/
public static StoredDataResult map(ResultSet resultSet) {
public static StoredDataResult map(RowSet<Row> resultSet) {
return map(resultSet, null, Collections.emptySet(), Collections.emptySet());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.prebid.server.settings.helper;

import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;
import org.apache.commons.collections4.CollectionUtils;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
import io.vertx.sqlclient.RowSet;
import org.prebid.server.settings.model.StoredResponseDataResult;

import java.util.ArrayList;
Expand All @@ -17,26 +17,29 @@ public class JdbcStoredResponseResultMapper {
private JdbcStoredResponseResultMapper() {
}

public static StoredResponseDataResult map(ResultSet resultSet, Set<String> responseIds) {
public static StoredResponseDataResult map(RowSet<Row> rowSet, Set<String> responseIds) {
final Map<String, String> storedIdToResponse = new HashMap<>(responseIds.size());
final List<String> errors = new ArrayList<>();

if (resultSet == null || CollectionUtils.isEmpty(resultSet.getResults())) {
final RowIterator<Row> rowIterator = rowSet != null ? rowSet.iterator() : null;
if (rowIterator == null || !rowIterator.hasNext()) {
handleEmptyResultError(responseIds, errors);
} else {
try {
for (JsonArray result : resultSet.getResults()) {
storedIdToResponse.put(result.getString(0), result.getString(1));
}
} catch (IndexOutOfBoundsException e) {
return StoredResponseDataResult.of(storedIdToResponse, errors);
}

while (rowIterator.hasNext()) {
final Row row = rowIterator.next();
if (row.toJson().size() < 2) {
errors.add("Result set column number is less than expected");
return StoredResponseDataResult.of(Collections.emptyMap(), errors);
}
errors.addAll(responseIds.stream().filter(id -> !storedIdToResponse.containsKey(id))
.map(id -> "No stored response found for id: " + id)
.toList());
storedIdToResponse.put(row.getString(0), row.getString(1));
}

errors.addAll(responseIds.stream().filter(id -> !storedIdToResponse.containsKey(id))
.map(id -> "No stored response found for id: " + id)
.toList());

return StoredResponseDataResult.of(storedIdToResponse, errors);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.prebid.server.settings.helper;

public interface ParametrizedQueryHelper {

String ACCOUNT_ID_PLACEHOLDER = "%ACCOUNT_ID%";
String REQUEST_ID_PLACEHOLDER = "%REQUEST_ID_LIST%";
String IMP_ID_PLACEHOLDER = "%IMP_ID_LIST%";
String RESPONSE_ID_PLACEHOLDER = "%RESPONSE_ID_LIST%";

String replaceAccountIdPlaceholder(String query);

String replaceStoredResponseIdPlaceholders(String query, int idsNumber);

String replaceRequestAndImpIdPlaceholders(String query, int requestIdNumber, int impIdNumber);

}
Loading
Loading