-
Notifications
You must be signed in to change notification settings - Fork 187
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add S3/MinIO support for application settings
- Loading branch information
Showing
7 changed files
with
1,128 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
205 changes: 205 additions & 0 deletions
205
src/main/java/org/prebid/server/settings/S3ApplicationSettings.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
package org.prebid.server.settings; | ||
|
||
import io.vertx.core.CompositeFuture; | ||
import io.vertx.core.Future; | ||
import io.vertx.core.Vertx; | ||
import org.prebid.server.auction.model.Tuple2; | ||
import org.prebid.server.exception.PreBidException; | ||
import org.prebid.server.execution.Timeout; | ||
import org.prebid.server.json.DecodeException; | ||
import org.prebid.server.json.JacksonMapper; | ||
import org.prebid.server.settings.model.Account; | ||
import org.prebid.server.settings.model.StoredDataResult; | ||
import org.prebid.server.settings.model.StoredResponseDataResult; | ||
import software.amazon.awssdk.core.async.AsyncResponseTransformer; | ||
import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
import software.amazon.awssdk.services.s3.model.GetObjectRequest; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
/** | ||
* Implementation of {@link ApplicationSettings}. | ||
* <p> | ||
* Reads an application settings from JSON file in an s3 bucket, stores and serves them in and from the memory. | ||
* <p> | ||
* Immediately loads stored request data from local files. These are stored in memory for low-latency reads. | ||
* This expects each file in the directory to be named "{config_id}.json". | ||
*/ | ||
public class S3ApplicationSettings implements ApplicationSettings { | ||
|
||
private static final String JSON_SUFFIX = ".json"; | ||
|
||
final S3AsyncClient asyncClient; | ||
final String bucket; | ||
final String accountsDirectory; | ||
final String storedImpressionsDirectory; | ||
final String storedRequestsDirectory; | ||
final String storedResponsesDirectory; | ||
final JacksonMapper jacksonMapper; | ||
final Vertx vertx; | ||
|
||
public S3ApplicationSettings( | ||
S3AsyncClient asyncClient, | ||
String bucket, | ||
String accountsDirectory, | ||
String storedImpressionsDirectory, | ||
String storedRequestsDirectory, | ||
String storedResponsesDirectory, | ||
JacksonMapper jacksonMapper, | ||
Vertx vertx) { | ||
this.asyncClient = asyncClient; | ||
this.bucket = bucket; | ||
this.accountsDirectory = accountsDirectory; | ||
this.storedImpressionsDirectory = storedImpressionsDirectory; | ||
this.storedRequestsDirectory = storedRequestsDirectory; | ||
this.storedResponsesDirectory = storedResponsesDirectory; | ||
this.jacksonMapper = jacksonMapper; | ||
this.vertx = vertx; | ||
} | ||
|
||
@Override | ||
public Future<Account> getAccountById(String accountId, Timeout timeout) { | ||
return downloadFile(accountsDirectory + "/" + accountId + JSON_SUFFIX) | ||
.map(fileContentOpt -> | ||
fileContentOpt.map(fileContent -> jacksonMapper.decodeValue(fileContent, Account.class))) | ||
.compose(accountOpt -> { | ||
if (accountOpt.isPresent()) { | ||
return Future.succeededFuture(accountOpt.get()); | ||
} else { | ||
return Future | ||
.failedFuture(new PreBidException("Account with id %s not found".formatted(accountId))); | ||
} | ||
}) | ||
.recover(ex -> { | ||
if (ex instanceof DecodeException) { | ||
return Future | ||
.failedFuture( | ||
new PreBidException( | ||
"Invalid json for account with id %s".formatted(accountId))); | ||
} | ||
return Future | ||
.failedFuture(new PreBidException("Account with id %s not found".formatted(accountId))); | ||
}); | ||
} | ||
|
||
@Override | ||
public Future<StoredDataResult> getStoredData( | ||
String accountId, | ||
Set<String> requestIds, | ||
Set<String> impIds, | ||
Timeout timeout) { | ||
|
||
return getFileContents(storedRequestsDirectory, requestIds).compose(storedIdToRequest -> | ||
getFileContents(storedImpressionsDirectory, impIds) | ||
.map(storedIdToImp -> { | ||
final List<String> missingStoredRequestIds = | ||
getMissingStoredDataIds(storedIdToRequest).stream() | ||
.map("No stored request found for id: %s"::formatted).toList(); | ||
final List<String> missingStoredImpressionIds = | ||
getMissingStoredDataIds(storedIdToImp).stream() | ||
.map("No stored impression found for id: %s"::formatted).toList(); | ||
|
||
return StoredDataResult.of( | ||
filterOptionalFileContent(storedIdToRequest), | ||
filterOptionalFileContent(storedIdToImp), | ||
Stream.concat( | ||
missingStoredImpressionIds.stream(), | ||
missingStoredRequestIds.stream()).toList()); | ||
} | ||
)); | ||
} | ||
|
||
private Map<String, String> filterOptionalFileContent(Map<String, Optional<String>> fileContents) { | ||
return fileContents | ||
.entrySet() | ||
.stream() | ||
.filter(e -> e.getValue().isPresent()) | ||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); | ||
} | ||
|
||
private List<String> getMissingStoredDataIds(Map<String, Optional<String>> fileContents) { | ||
return fileContents.entrySet().stream().filter(e -> e.getValue().isEmpty()).map(Map.Entry::getKey).toList(); | ||
} | ||
|
||
@Override | ||
public Future<StoredDataResult> getAmpStoredData( | ||
String accountId, | ||
Set<String> requestIds, | ||
Set<String> impIds, | ||
Timeout timeout) { | ||
return getStoredData(accountId, requestIds, Collections.emptySet(), timeout); | ||
} | ||
|
||
@Override | ||
public Future<StoredDataResult> getVideoStoredData( | ||
String accountId, | ||
Set<String> requestIds, | ||
Set<String> impIds, | ||
Timeout timeout) { | ||
return getStoredData(accountId, requestIds, impIds, timeout); | ||
} | ||
|
||
@Override | ||
public Future<StoredResponseDataResult> getStoredResponses(Set<String> responseIds, Timeout timeout) { | ||
return getFileContents(storedResponsesDirectory, responseIds).map(storedIdToResponse -> { | ||
final List<String> missingStoredResponseIds = | ||
getMissingStoredDataIds(storedIdToResponse).stream() | ||
.map("No stored response found for id: %s"::formatted).toList(); | ||
|
||
return StoredResponseDataResult.of( | ||
filterOptionalFileContent(storedIdToResponse), | ||
missingStoredResponseIds | ||
); | ||
}); | ||
} | ||
|
||
@Override | ||
public Future<Map<String, String>> getCategories(String primaryAdServer, String publisher, Timeout timeout) { | ||
return Future.succeededFuture(Collections.emptyMap()); | ||
} | ||
|
||
private Future<Map<String, Optional<String>>> getFileContents(String directory, Set<String> ids) { | ||
final List<Future<Tuple2<String, Optional<String>>>> futureListContents = ids.stream() | ||
.map(impressionId -> | ||
downloadFile(directory + withInitialSlash(impressionId) + JSON_SUFFIX) | ||
.map(fileContent -> Tuple2.of(impressionId, fileContent))) | ||
.collect(Collectors.toCollection(ArrayList::new)); | ||
|
||
final Future<List<Tuple2<String, Optional<String>>>> composedFutures = CompositeFuture | ||
.all(new ArrayList<>(futureListContents)) | ||
.map(CompositeFuture::list); | ||
|
||
return composedFutures.map(one -> one.stream().collect(Collectors.toMap(Tuple2::getLeft, Tuple2::getRight))); | ||
} | ||
|
||
/** | ||
* When the impression id is the ad unit path it may already start with a slash and there's no need to add | ||
* another one. | ||
* | ||
* @param impressionId from the bid request | ||
* @return impression id with only a single slash at the beginning | ||
*/ | ||
private String withInitialSlash(String impressionId) { | ||
if (impressionId.startsWith("/")) { | ||
return impressionId; | ||
} | ||
return "/" + impressionId; | ||
} | ||
|
||
private Future<Optional<String>> downloadFile(String key) { | ||
final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); | ||
|
||
return Future.fromCompletionStage( | ||
asyncClient.getObject(request, AsyncResponseTransformer.toBytes()), | ||
vertx.getOrCreateContext()) | ||
.map(test -> Optional.of(test.asUtf8String())).recover(ex -> Future.succeededFuture(Optional.empty())); | ||
} | ||
|
||
} |
Oops, something went wrong.