From 6f82ee8a7dcd2663bb9828de676f45ea2514c2ea Mon Sep 17 00:00:00 2001 From: Robin Mattis Date: Mon, 19 Sep 2022 17:19:17 +0200 Subject: [PATCH] Add S3/MinIO support for application settings --- docs/application-settings.md | 44 +++ pom.xml | 11 + .../settings/S3ApplicationSettings.java | 205 ++++++++++ .../service/S3PeriodicRefreshService.java | 192 ++++++++++ .../spring/config/SettingsConfiguration.java | 121 +++++- .../settings/S3ApplicationSettingsTest.java | 350 ++++++++++++++++++ .../service/S3PeriodicRefreshServiceTest.java | 208 +++++++++++ 7 files changed, 1128 insertions(+), 3 deletions(-) create mode 100644 src/main/java/org/prebid/server/settings/S3ApplicationSettings.java create mode 100644 src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java create mode 100644 src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java create mode 100644 src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java diff --git a/docs/application-settings.md b/docs/application-settings.md index 885c836a257..f174f2eb065 100644 --- a/docs/application-settings.md +++ b/docs/application-settings.md @@ -208,6 +208,50 @@ Here's an example YAML file containing account-specific settings: default-coop-sync: true ``` +## Setting Account Configuration in S3 + +This is identical to the account configuration in a file system, with the main difference that your file system is +[AWS S3](https://aws.amazon.com/de/s3/) or any S3 compatible storage, such as [MinIO](https://min.io/). + + +The general idea is that you'll place all the account-specific settings in a separate YAML file and point to that file. + +```yaml +settings: + s3: + accessKeyId: + secretAccessKey: + endpoint: # http://s3.storage.com + bucket: # prebid-application-settings + accounts-dir: accounts + stored-imps-dir: stored-impressions + stored-requests-dir: stored-requests + stored-responses-dir: stored-responses + + # recommended to configure an in memory cache, but this is optional + in-memory-cache: + # example settings, tailor to your needs + cache-size: 100000 + ttl-seconds: 1200 # 20 minutes + # recommended to configure + s3-update: + refresh-rate: 900000 # Refresh every 15 minutes + timeout: 5000 +``` + +### File format + +We recommend using the `json` format for your account configuration. A minimal configuration may look like this. + +```json +{ + "id" : "979c7116-1f5a-43d4-9a87-5da3ccc4f52c", + "status" : "active" +} +``` + +This pairs nicely if you have a default configuration defined in your prebid server config under `settings.default-account-config`. + ## Setting Account Configuration in the Database In database approach account properties are stored in database table(s). diff --git a/pom.xml b/pom.xml index fd67628d5f9..5875197727b 100644 --- a/pom.xml +++ b/pom.xml @@ -107,6 +107,13 @@ pom import + + software.amazon.awssdk + bom + 2.17.274 + pom + import + @@ -275,6 +282,10 @@ postgresql ${postgresql.version} + + software.amazon.awssdk + s3 + com.github.ben-manes.caffeine caffeine diff --git a/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java b/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java new file mode 100644 index 00000000000..985dfa25b2b --- /dev/null +++ b/src/main/java/org/prebid/server/settings/S3ApplicationSettings.java @@ -0,0 +1,205 @@ +package org.prebid.server.settings; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import org.prebid.server.auction.model.Tuple2; +import org.prebid.server.exception.PreBidException; +import org.prebid.server.execution.Timeout; +import org.prebid.server.json.DecodeException; +import org.prebid.server.json.JacksonMapper; +import org.prebid.server.settings.model.Account; +import org.prebid.server.settings.model.StoredDataResult; +import org.prebid.server.settings.model.StoredResponseDataResult; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Implementation of {@link ApplicationSettings}. + *

+ * Reads an application settings from JSON file in an s3 bucket, stores and serves them in and from the memory. + *

+ * Immediately loads stored request data from local files. These are stored in memory for low-latency reads. + * This expects each file in the directory to be named "{config_id}.json". + */ +public class S3ApplicationSettings implements ApplicationSettings { + + private static final String JSON_SUFFIX = ".json"; + + final S3AsyncClient asyncClient; + final String bucket; + final String accountsDirectory; + final String storedImpressionsDirectory; + final String storedRequestsDirectory; + final String storedResponsesDirectory; + final JacksonMapper jacksonMapper; + final Vertx vertx; + + public S3ApplicationSettings( + S3AsyncClient asyncClient, + String bucket, + String accountsDirectory, + String storedImpressionsDirectory, + String storedRequestsDirectory, + String storedResponsesDirectory, + JacksonMapper jacksonMapper, + Vertx vertx) { + this.asyncClient = asyncClient; + this.bucket = bucket; + this.accountsDirectory = accountsDirectory; + this.storedImpressionsDirectory = storedImpressionsDirectory; + this.storedRequestsDirectory = storedRequestsDirectory; + this.storedResponsesDirectory = storedResponsesDirectory; + this.jacksonMapper = jacksonMapper; + this.vertx = vertx; + } + + @Override + public Future getAccountById(String accountId, Timeout timeout) { + return downloadFile(accountsDirectory + "/" + accountId + JSON_SUFFIX) + .map(fileContentOpt -> + fileContentOpt.map(fileContent -> jacksonMapper.decodeValue(fileContent, Account.class))) + .compose(accountOpt -> { + if (accountOpt.isPresent()) { + return Future.succeededFuture(accountOpt.get()); + } else { + return Future + .failedFuture(new PreBidException("Account with id %s not found".formatted(accountId))); + } + }) + .recover(ex -> { + if (ex instanceof DecodeException) { + return Future + .failedFuture( + new PreBidException( + "Invalid json for account with id %s".formatted(accountId))); + } + return Future + .failedFuture(new PreBidException("Account with id %s not found".formatted(accountId))); + }); + } + + @Override + public Future getStoredData( + String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + + return getFileContents(storedRequestsDirectory, requestIds).compose(storedIdToRequest -> + getFileContents(storedImpressionsDirectory, impIds) + .map(storedIdToImp -> { + final List missingStoredRequestIds = + getMissingStoredDataIds(storedIdToRequest).stream() + .map("No stored request found for id: %s"::formatted).toList(); + final List missingStoredImpressionIds = + getMissingStoredDataIds(storedIdToImp).stream() + .map("No stored impression found for id: %s"::formatted).toList(); + + return StoredDataResult.of( + filterOptionalFileContent(storedIdToRequest), + filterOptionalFileContent(storedIdToImp), + Stream.concat( + missingStoredImpressionIds.stream(), + missingStoredRequestIds.stream()).toList()); + } + )); + } + + private Map filterOptionalFileContent(Map> fileContents) { + return fileContents + .entrySet() + .stream() + .filter(e -> e.getValue().isPresent()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); + } + + private List getMissingStoredDataIds(Map> fileContents) { + return fileContents.entrySet().stream().filter(e -> e.getValue().isEmpty()).map(Map.Entry::getKey).toList(); + } + + @Override + public Future getAmpStoredData( + String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + return getStoredData(accountId, requestIds, Collections.emptySet(), timeout); + } + + @Override + public Future getVideoStoredData( + String accountId, + Set requestIds, + Set impIds, + Timeout timeout) { + return getStoredData(accountId, requestIds, impIds, timeout); + } + + @Override + public Future getStoredResponses(Set responseIds, Timeout timeout) { + return getFileContents(storedResponsesDirectory, responseIds).map(storedIdToResponse -> { + final List missingStoredResponseIds = + getMissingStoredDataIds(storedIdToResponse).stream() + .map("No stored response found for id: %s"::formatted).toList(); + + return StoredResponseDataResult.of( + filterOptionalFileContent(storedIdToResponse), + missingStoredResponseIds + ); + }); + } + + @Override + public Future> getCategories(String primaryAdServer, String publisher, Timeout timeout) { + return Future.succeededFuture(Collections.emptyMap()); + } + + private Future>> getFileContents(String directory, Set ids) { + final List>>> futureListContents = ids.stream() + .map(impressionId -> + downloadFile(directory + withInitialSlash(impressionId) + JSON_SUFFIX) + .map(fileContent -> Tuple2.of(impressionId, fileContent))) + .collect(Collectors.toCollection(ArrayList::new)); + + final Future>>> composedFutures = CompositeFuture + .all(new ArrayList<>(futureListContents)) + .map(CompositeFuture::list); + + return composedFutures.map(one -> one.stream().collect(Collectors.toMap(Tuple2::getLeft, Tuple2::getRight))); + } + + /** + * When the impression id is the ad unit path it may already start with a slash and there's no need to add + * another one. + * + * @param impressionId from the bid request + * @return impression id with only a single slash at the beginning + */ + private String withInitialSlash(String impressionId) { + if (impressionId.startsWith("/")) { + return impressionId; + } + return "/" + impressionId; + } + + private Future> downloadFile(String key) { + final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); + + return Future.fromCompletionStage( + asyncClient.getObject(request, AsyncResponseTransformer.toBytes()), + vertx.getOrCreateContext()) + .map(test -> Optional.of(test.asUtf8String())).recover(ex -> Future.succeededFuture(Optional.empty())); + } + +} diff --git a/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java b/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java new file mode 100644 index 00000000000..81f1146d81c --- /dev/null +++ b/src/main/java/org/prebid/server/settings/service/S3PeriodicRefreshService.java @@ -0,0 +1,192 @@ +package org.prebid.server.settings.service; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import org.prebid.server.auction.model.Tuple2; +import org.prebid.server.metric.MetricName; +import org.prebid.server.metric.Metrics; +import org.prebid.server.settings.CacheNotificationListener; +import org.prebid.server.settings.model.StoredDataResult; +import org.prebid.server.vertx.Initializable; +import software.amazon.awssdk.core.BytesWrapper; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + *

+ * Service that periodically calls s3 for stored request updates. + * If refreshRate is negative, then the data will never be refreshed. + *

+ * Fetches all files from the specified folders/prefixes in s3 and downloads all files. + */ +public class S3PeriodicRefreshService implements Initializable { + + private static final String JSON_SUFFIX = ".json"; + + private static final Logger logger = LoggerFactory.getLogger(S3PeriodicRefreshService.class); + + private final S3AsyncClient asyncClient; + private final String bucket; + private final String storedImpressionsDirectory; + private final String storedRequestsDirectory; + private final long refreshPeriod; + private final long timeout; + private final MetricName cacheType; + private final CacheNotificationListener cacheNotificationListener; + private final Vertx vertx; + private final Metrics metrics; + private final Clock clock; + private StoredDataResult lastResult; + + public S3PeriodicRefreshService(S3AsyncClient asyncClient, + String bucket, + String storedRequestsDirectory, + String storedImpressionsDirectory, + long refreshPeriod, + long timeout, + MetricName cacheType, + CacheNotificationListener cacheNotificationListener, + Vertx vertx, + Metrics metrics, + Clock clock) { + + this.asyncClient = asyncClient; + this.bucket = bucket; + this.storedRequestsDirectory = storedRequestsDirectory; + this.storedImpressionsDirectory = storedImpressionsDirectory; + this.refreshPeriod = refreshPeriod; + this.timeout = timeout; + this.cacheType = Objects.requireNonNull(cacheType); + this.cacheNotificationListener = Objects.requireNonNull(cacheNotificationListener); + this.vertx = Objects.requireNonNull(vertx); + this.metrics = Objects.requireNonNull(metrics); + this.clock = Objects.requireNonNull(clock); + } + + private static List getInvalidatedKeys(Map newMap, Map oldMap) { + return oldMap.keySet().stream().filter(s -> !newMap.containsKey(s)).toList(); + } + + @Override + public void initialize() { + getAll(); + if (refreshPeriod > 0) { + vertx.setPeriodic(refreshPeriod, aLong -> refresh()); + } + } + + private void getAll() { + final long startTime = clock.millis(); + + getFileContentsForDirectory(storedRequestsDirectory) + .compose(storedIdToRequest -> getFileContentsForDirectory(storedImpressionsDirectory) + .map(storedIdToImp -> + StoredDataResult.of(storedIdToRequest, storedIdToImp, Collections.emptyList()))) + .map(storedDataResult -> handleResult(storedDataResult, startTime, MetricName.initialize)) + .recover(exception -> handleFailure(exception, startTime, MetricName.initialize)); + } + + private void refresh() { + final long startTime = clock.millis(); + + getFileContentsForDirectory(storedRequestsDirectory) + .compose(storedIdToRequest -> getFileContentsForDirectory(storedImpressionsDirectory) + .map(storedIdToImp -> + StoredDataResult.of(storedIdToRequest, storedIdToImp, Collections.emptyList()))) + .map(storedDataResult -> handleResult(invalidate(storedDataResult), startTime, MetricName.update)) + .recover(exception -> handleFailure(exception, startTime, MetricName.update)); + } + + private Void handleResult(StoredDataResult storedDataResult, + long startTime, + MetricName refreshType) { + + lastResult = storedDataResult; + + cacheNotificationListener.save(storedDataResult.getStoredIdToRequest(), storedDataResult.getStoredIdToImp()); + + metrics.updateSettingsCacheRefreshTime(cacheType, refreshType, clock.millis() - startTime); + + return null; + } + + private Future handleFailure(Throwable exception, long startTime, MetricName refreshType) { + logger.warn("Error occurred while request to s3 refresh service", exception); + + metrics.updateSettingsCacheRefreshTime(cacheType, refreshType, clock.millis() - startTime); + metrics.updateSettingsCacheRefreshErrorMetric(cacheType, refreshType); + + return Future.failedFuture(exception); + } + + private StoredDataResult invalidate(StoredDataResult storedDataResult) { + final List invalidatedRequests = getInvalidatedKeys( + storedDataResult.getStoredIdToRequest(), + lastResult != null ? lastResult.getStoredIdToRequest() : Collections.emptyMap()); + final List invalidatedImps = getInvalidatedKeys( + storedDataResult.getStoredIdToImp(), + lastResult != null ? lastResult.getStoredIdToImp() : Collections.emptyMap()); + + if (!invalidatedRequests.isEmpty() || !invalidatedImps.isEmpty()) { + cacheNotificationListener.invalidate(invalidatedRequests, invalidatedImps); + } + + return storedDataResult; + } + + private Future> listFiles(String prefix) { + final ListObjectsRequest listObjectsRequest = + ListObjectsRequest.builder().bucket(bucket).prefix(prefix).build(); + + return Future.fromCompletionStage(asyncClient.listObjects(listObjectsRequest)) + .map(response -> response.contents().stream().map(S3Object::key).toList()); + } + + private Future> getFileContentsForDirectory(String directory) { + return listFiles(directory) + .compose(files -> + getFileContents(new HashSet<>(files)) + .map(map -> map.entrySet().stream().collect( + Collectors.toMap( + e -> stripFileName(directory, e.getKey()), + Map.Entry::getValue)))); + } + + private String stripFileName(String directory, String name) { + return name.replace(directory + "/", "").replace(JSON_SUFFIX, ""); + } + + private Future> getFileContents(Set fileNames) { + final List>> futureListContents = fileNames.stream() + .map(fileName -> downloadFile(fileName).map(fileContent -> Tuple2.of(fileName, fileContent))) + .collect(Collectors.toCollection(ArrayList::new)); + + final Future>> composedFutures = + CompositeFuture.all(new ArrayList<>(futureListContents)).map(CompositeFuture::list); + + return composedFutures.map(one -> one.stream().collect(Collectors.toMap(Tuple2::getLeft, Tuple2::getRight))); + } + + private Future downloadFile(String key) { + final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build(); + + return Future.fromCompletionStage(asyncClient.getObject(request, AsyncResponseTransformer.toBytes())) + .map(BytesWrapper::asUtf8String); + } +} diff --git a/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java b/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java index 37b079d9ef4..9b410fa308f 100644 --- a/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java +++ b/src/main/java/org/prebid/server/spring/config/SettingsConfiguration.java @@ -19,10 +19,12 @@ import org.prebid.server.settings.FileApplicationSettings; import org.prebid.server.settings.HttpApplicationSettings; import org.prebid.server.settings.JdbcApplicationSettings; +import org.prebid.server.settings.S3ApplicationSettings; import org.prebid.server.settings.SettingsCache; import org.prebid.server.settings.service.HttpPeriodicRefreshService; import org.prebid.server.settings.service.JdbcPeriodicRefreshService; -import org.prebid.server.spring.config.database.DatabaseConfiguration; +import org.prebid.server.settings.service.S3PeriodicRefreshService; +import org.prebid.server.spring.config.database.DatabaseConfiguration import org.prebid.server.vertx.http.HttpClient; import org.prebid.server.vertx.jdbc.JdbcClient; import org.springframework.beans.factory.annotation.Autowired; @@ -35,9 +37,16 @@ import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; +import java.net.URI; +import java.net.URISyntaxException; import java.time.Clock; import java.util.List; import java.util.Objects; @@ -213,6 +222,110 @@ public JdbcPeriodicRefreshService ampJdbcPeriodicRefreshService( } } + @Configuration + @ConditionalOnProperty(prefix = "settings.s3", name = {"accounts-dir", "stored-imps-dir", "stored-requests-dir"}) + static class S3SettingsConfiguration { + + @Component + @ConfigurationProperties(prefix = "settings.s3") + @ConditionalOnProperty(prefix = "settings.s3", name = {"accessKeyId", "secretAccessKey"}) + @Validated + @Data + @NoArgsConstructor + private static class S3ConfigurationProperties { + @NotBlank + private String accessKeyId; + @NotBlank + private String secretAccessKey; + @NotBlank + private String endpoint; + @NotBlank + private String bucket; + @NotBlank + private String accountsDir; + @NotBlank + private String storedImpsDir; + @NotBlank + private String storedRequestsDir; + @NotBlank + private String storedResponsesDir; + } + + @Bean + S3AsyncClient s3AsyncClient(S3ConfigurationProperties s3ConfigurationProperties) throws URISyntaxException { + final AwsBasicCredentials credentials = AwsBasicCredentials.create( + s3ConfigurationProperties.getAccessKeyId(), + s3ConfigurationProperties.getSecretAccessKey()); + + return S3AsyncClient + .builder() + .credentialsProvider(StaticCredentialsProvider.create(credentials)) + .endpointOverride(new URI(s3ConfigurationProperties.getEndpoint())) + .region(Region.EU_CENTRAL_1) + .build(); + } + + @Bean + S3ApplicationSettings s3ApplicationSettings( + JacksonMapper mapper, + S3ConfigurationProperties s3ConfigurationProperties, + S3AsyncClient s3AsyncClient, + Vertx vertx) { + + return new S3ApplicationSettings( + s3AsyncClient, + s3ConfigurationProperties.getBucket(), + s3ConfigurationProperties.getAccountsDir(), + s3ConfigurationProperties.getStoredImpsDir(), + s3ConfigurationProperties.getStoredRequestsDir(), + s3ConfigurationProperties.getStoredResponsesDir(), + mapper, + vertx); + } + } + + @Configuration + @ConditionalOnProperty(prefix = "settings.in-memory-cache.s3-update", + name = {"refresh-rate", "timeout"}) + static class S3PeriodicRefreshServiceConfiguration { + + @Value("${settings.in-memory-cache.s3-update.refresh-rate}") + long refreshPeriod; + + @Value("${settings.in-memory-cache.s3-update.timeout}") + long timeout; + + @Autowired + Vertx vertx; + + @Autowired + HttpClient httpClient; + @Autowired + Metrics metrics; + @Autowired + Clock clock; + + @Bean + public S3PeriodicRefreshService s3PeriodicRefreshService( + S3AsyncClient s3AsyncClient, + S3SettingsConfiguration.S3ConfigurationProperties s3ConfigurationProperties, + SettingsCache settingsCache, + JacksonMapper mapper) { + return new S3PeriodicRefreshService( + s3AsyncClient, + s3ConfigurationProperties.getBucket(), + s3ConfigurationProperties.getStoredRequestsDir(), + s3ConfigurationProperties.getStoredImpsDir(), + refreshPeriod, + timeout, + MetricName.stored_request, + settingsCache, + vertx, + metrics, + clock); + } + } + /** * This configuration defines a collection of application settings fetchers and its ordering. */ @@ -223,10 +336,12 @@ static class CompositeSettingsConfiguration { CompositeApplicationSettings compositeApplicationSettings( @Autowired(required = false) FileApplicationSettings fileApplicationSettings, @Autowired(required = false) JdbcApplicationSettings jdbcApplicationSettings, - @Autowired(required = false) HttpApplicationSettings httpApplicationSettings) { + @Autowired(required = false) HttpApplicationSettings httpApplicationSettings, + @Autowired(required = false) S3ApplicationSettings s3ApplicationSettings) { final List applicationSettingsList = - Stream.of(fileApplicationSettings, + Stream.of(s3ApplicationSettings, + fileApplicationSettings, jdbcApplicationSettings, httpApplicationSettings) .filter(Objects::nonNull) diff --git a/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java b/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java new file mode 100644 index 00000000000..dec091bdb9a --- /dev/null +++ b/src/test/java/org/prebid/server/settings/S3ApplicationSettingsTest.java @@ -0,0 +1,350 @@ +package org.prebid.server.settings; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.prebid.server.VertxTest; +import org.prebid.server.exception.PreBidException; +import org.prebid.server.execution.Timeout; +import org.prebid.server.execution.TimeoutFactory; +import org.prebid.server.settings.model.Account; +import org.prebid.server.settings.model.AccountAuctionConfig; +import org.prebid.server.settings.model.AccountPrivacyConfig; +import org.prebid.server.settings.model.StoredDataResult; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.verify; + +@RunWith(VertxUnitRunner.class) +public class S3ApplicationSettingsTest extends VertxTest { + + private static final String BUCKET = "bucket"; + private static final String ACCOUNTS_DIR = "accounts"; + private static final String STORED_IMPS_DIR = "stored-imps"; + private static final String STORED_REQUESTS_DIR = "stored-requests"; + private static final String STORED_RESPONSES_DIR = "stored-responses"; + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + private Timeout timeout; + + @Mock + private S3AsyncClient s3AsyncClient; + private Vertx vertx; + + private S3ApplicationSettings s3ApplicationSettings; + + @Before + public void setUp() { + vertx = Vertx.vertx(); + s3ApplicationSettings = new S3ApplicationSettings(s3AsyncClient, BUCKET, ACCOUNTS_DIR, + STORED_IMPS_DIR, STORED_REQUESTS_DIR, STORED_RESPONSES_DIR, jacksonMapper, vertx); + + final Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + final TimeoutFactory timeoutFactory = new TimeoutFactory(clock); + timeout = timeoutFactory.create(500L); + } + + @After + public void tearDown(TestContext context) { + vertx.close(context.asyncAssertSuccess()); + } + + @Test + public void getAccountByIdShouldReturnFetchedAccount(TestContext context) throws JsonProcessingException { + // given + final Account account = Account.builder() + .id("someId") + .auction(AccountAuctionConfig.builder() + .priceGranularity("testPriceGranularity") + .build()) + .privacy(AccountPrivacyConfig.of(null, null, null, null)) + .build(); + + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + mapper.writeValueAsString(account).getBytes()))); + + // when + final Future future = s3ApplicationSettings.getAccountById("someId", timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertSuccess(returnedAccount -> { + assertThat(returnedAccount.getId()).isEqualTo("someId"); + assertThat(returnedAccount.getAuction().getPriceGranularity()).isEqualTo("testPriceGranularity"); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(ACCOUNTS_DIR + "/someId.json").build()), + any(AsyncResponseTransformer.class)); + async.complete(); + })); + } + + @Test + public void getAccountByIdNoSuchKey(TestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("")))); + + // when + final Future future = s3ApplicationSettings.getAccountById("notFoundId", timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertFailure(cause -> { + assertThat(cause) + .isInstanceOf(PreBidException.class) + .hasMessage("Account with id notFoundId not found"); + + async.complete(); + })); + } + + @Test + public void getAccountByIdInvalidJson(TestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "invalidJson".getBytes()))); + + // when + final Future future = s3ApplicationSettings.getAccountById("invalidJsonId", timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertFailure(cause -> { + assertThat(cause) + .isInstanceOf(PreBidException.class) + .hasMessage("Invalid json for account with id invalidJsonId"); + async.complete(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredRequest(TestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "req1Result".getBytes()))); + + // when + final Future future = s3ApplicationSettings + .getStoredData("someId", Set.of("req1"), Collections.emptySet(), timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertSuccess(account -> { + assertThat(account.getStoredIdToRequest().size()).isEqualTo(1); + assertThat(account.getStoredIdToImp().size()).isEqualTo(0); + assertThat(account.getStoredIdToRequest()).isEqualTo(Map.of("req1", "req1Result")); + assertThat(account.getErrors()).isEmpty(); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_REQUESTS_DIR + "/req1.json").build()), + any(AsyncResponseTransformer.class)); + + async.complete(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredImpression(TestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "imp1Result".getBytes()))); + + // when + final Future future = s3ApplicationSettings + .getStoredData("someId", Collections.emptySet(), Set.of("imp1"), timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertSuccess(account -> { + assertThat(account.getStoredIdToRequest().size()).isEqualTo(0); + assertThat(account.getStoredIdToImp().size()).isEqualTo(1); + assertThat(account.getStoredIdToImp()).isEqualTo(Map.of("imp1", "imp1Result")); + assertThat(account.getErrors()).isEmpty(); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_IMPS_DIR + "/imp1.json").build()), + any(AsyncResponseTransformer.class)); + + async.complete(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredImpressionWithAdUnitPathStoredId(TestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "imp1Result".getBytes()))); + + // when + final Future future = s3ApplicationSettings + .getStoredData("/123/root/position-1", Collections.emptySet(), Set.of("imp1"), timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertSuccess(account -> { + assertThat(account.getStoredIdToRequest().size()).isEqualTo(0); + assertThat(account.getStoredIdToImp().size()).isEqualTo(1); + assertThat(account.getStoredIdToImp()).isEqualTo(Map.of("imp1", "imp1Result")); + assertThat(account.getErrors()).isEmpty(); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_IMPS_DIR + "/imp1.json").build()), + any(AsyncResponseTransformer.class)); + + async.complete(); + })); + } + + @Test + public void getStoredDataShouldReturnFetchedStoredImpressionAndStoredRequest(TestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn( + CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "req1Result".getBytes())), + CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "imp1Result".getBytes()))); + + // when + final Future future = s3ApplicationSettings + .getStoredData("someId", Set.of("req1"), Set.of("imp1"), timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertSuccess(account -> { + assertThat(account.getStoredIdToRequest().size()).isEqualTo(1); + assertThat(account.getStoredIdToRequest()).isEqualTo(Map.of("req1", "req1Result")); + assertThat(account.getStoredIdToImp().size()).isEqualTo(1); + assertThat(account.getStoredIdToImp()).isEqualTo(Map.of("imp1", "imp1Result")); + assertThat(account.getErrors()).isEmpty(); + + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_IMPS_DIR + "/imp1.json").build()), + any(AsyncResponseTransformer.class)); + verify(s3AsyncClient).getObject( + eq(GetObjectRequest.builder().bucket(BUCKET).key(STORED_REQUESTS_DIR + "/req1.json").build()), + any(AsyncResponseTransformer.class)); + + async.complete(); + })); + } + + @Test + public void getStoredDataReturnsErrorsForNotFoundRequests(TestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("")))); + + // when + final Future future = s3ApplicationSettings + .getStoredData("someId", Set.of("req1"), Collections.emptySet(), timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertSuccess(account -> { + assertThat(account.getStoredIdToImp()).isEmpty(); + assertThat(account.getStoredIdToRequest()).isEmpty(); + assertThat(account.getErrors().size()).isEqualTo(1); + assertThat(account.getErrors()) + .isNotNull() + .hasSize(1) + .isEqualTo(singletonList("No stored request found for id: req1")); + + async.complete(); + })); + } + + @Test + public void getStoredDataReturnsErrorsForNotFoundImpressions(TestContext context) { + // given + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn( + CompletableFuture.failedFuture( + NoSuchKeyException.create( + "The specified key does not exist.", + new IllegalStateException("")))); + + // when + final Future future = s3ApplicationSettings + .getStoredData("someId", Collections.emptySet(), Set.of("imp1"), timeout); + + // then + final Async async = context.async(); + + future.onComplete(context.asyncAssertSuccess(account -> { + assertThat(account.getStoredIdToImp()).isEmpty(); + assertThat(account.getStoredIdToRequest()).isEmpty(); + assertThat(account.getErrors().size()).isEqualTo(1); + assertThat(account.getErrors()) + .isNotNull() + .hasSize(1) + .isEqualTo(singletonList("No stored impression found for id: imp1")); + + async.complete(); + })); + } + +} diff --git a/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java b/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java new file mode 100644 index 00000000000..2a1d6e1b248 --- /dev/null +++ b/src/test/java/org/prebid/server/settings/service/S3PeriodicRefreshServiceTest.java @@ -0,0 +1,208 @@ +package org.prebid.server.settings.service; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; +import org.prebid.server.VertxTest; +import org.prebid.server.metric.MetricName; +import org.prebid.server.metric.Metrics; +import org.prebid.server.settings.CacheNotificationListener; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class S3PeriodicRefreshServiceTest extends VertxTest { + + private static final String BUCKET = "bucket"; + private static final String STORED_REQ_DIR = "stored-req"; + private static final String STORED_IMP_DIR = "stored-imp"; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock + private CacheNotificationListener cacheNotificationListener; + @Mock + private Vertx vertx; + @Mock + private S3AsyncClient s3AsyncClient; + private final Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + @Mock + private Metrics metrics; + + private final Map expectedRequests = singletonMap("id1", "value1"); + private final Map expectedImps = singletonMap("id2", "value2"); + + @Before + public void setUp() { + given(s3AsyncClient.listObjects(any(ListObjectsRequest.class))) + .willReturn(listObjectResponse(STORED_REQ_DIR + "/id1.json"), + listObjectResponse(STORED_IMP_DIR + "/id2.json")); + + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "value1".getBytes())), + CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + "value2".getBytes()))); + } + + @Test + public void shouldCallSaveWithExpectedParameters() { + // when + createAndInitService(1000); + + // then + verify(cacheNotificationListener).save(expectedRequests, expectedImps); + } + + @Test + public void shouldCallInvalidateAndSaveWithExpectedParameters() { + // given + given(vertx.setPeriodic(anyLong(), any())) + .willAnswer(withSelfAndPassObjectToHandler(1L)); + given(s3AsyncClient.listObjects(any(ListObjectsRequest.class))) + .willReturn(listObjectResponse(STORED_REQ_DIR + "/id1.json"), + listObjectResponse(STORED_IMP_DIR + "/id2.json"), + listObjectResponse(), + listObjectResponse(STORED_IMP_DIR + "/id2.json")); + given(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .willReturn(getObjectResponse("value1"), + getObjectResponse("value2"), + getObjectResponse("changed_value")); + + // when + createAndInitService(1000); + + // then + verify(cacheNotificationListener).save(expectedRequests, expectedImps); + verify(cacheNotificationListener).invalidate(singletonList("id1"), emptyList()); + verify(cacheNotificationListener).save(emptyMap(), singletonMap("id2", "changed_value")); + } + + @Test + public void initializeShouldMakeOneInitialRequestAndTwoScheduledRequestsWithParam() { + // given + given(vertx.setPeriodic(anyLong(), any())) + .willAnswer(withSelfAndPassObjectToHandler(1L, 2L)); + + // when + createAndInitService(1000); + + // then + verify(s3AsyncClient, times(6)).listObjects(any(ListObjectsRequest.class)); + verify(s3AsyncClient, times(6)).getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class)); + } + + @Test + public void initializeShouldMakeOnlyOneInitialRequestIfRefreshPeriodIsNegative() { + // when + createAndInitService(-1); + + // then + verify(vertx, never()).setPeriodic(anyLong(), any()); + verify(s3AsyncClient, times(2)).listObjects(any(ListObjectsRequest.class)); + } + + @Test + public void shouldUpdateTimerMetric() { + // when + createAndInitService(1000); + + // then + verify(metrics).updateSettingsCacheRefreshTime( + eq(MetricName.stored_request), eq(MetricName.initialize), anyLong()); + } + + @Test + public void shouldUpdateTimerAndErrorMetric() { + // given + given(s3AsyncClient.listObjects(any(ListObjectsRequest.class))) + .willReturn(CompletableFuture.failedFuture(new IllegalStateException("Failed"))); + + // when + createAndInitService(1000); + + // then + verify(metrics).updateSettingsCacheRefreshTime( + eq(MetricName.stored_request), eq(MetricName.initialize), anyLong()); + verify(metrics).updateSettingsCacheRefreshErrorMetric( + eq(MetricName.stored_request), eq(MetricName.initialize)); + } + + private CompletableFuture listObjectResponse(String... keys) { + return CompletableFuture.completedFuture( + ListObjectsResponse + .builder() + .contents(Arrays.stream(keys).map(key -> S3Object.builder().key(key).build()).toList()) + .build()); + } + + private CompletableFuture> getObjectResponse(String value) { + return CompletableFuture.completedFuture( + ResponseBytes.fromByteArray( + GetObjectResponse.builder().build(), + value.getBytes())); + } + + private void createAndInitService(long refreshPeriod) { + final S3PeriodicRefreshService s3PeriodicRefreshService = new S3PeriodicRefreshService( + s3AsyncClient, + BUCKET, + STORED_REQ_DIR, + STORED_IMP_DIR, + refreshPeriod, + 2000, + MetricName.stored_request, + cacheNotificationListener, + vertx, + metrics, + clock); + s3PeriodicRefreshService.initialize(); + } + + @SuppressWarnings("unchecked") + private static Answer withSelfAndPassObjectToHandler(T... objects) { + return inv -> { + // invoking handler right away passing mock to it + for (T obj : objects) { + ((Handler) inv.getArgument(1)).handle(obj); + } + return 0L; + }; + } + +}