From 841e74fd3b27f376fd2b0895454b8e10bedeb885 Mon Sep 17 00:00:00 2001 From: ddubyk Date: Wed, 28 Aug 2024 18:51:38 +0300 Subject: [PATCH] Refactor handlers. --- .../builders/PrebidServerResponseBuilder.java | 56 ++-- .../cache/config/ObjectMapperConfig.java | 14 + .../prebid/cache/handlers/ErrorHandler.java | 16 +- .../prebid/cache/handlers/MetricsHandler.java | 9 - .../prebid/cache/handlers/PayloadType.java | 2 +- .../cache/handlers/cache/CacheHandler.java | 123 +++---- .../cache/handlers/cache/GetCacheHandler.java | 152 +++++---- .../handlers/cache/PostCacheHandler.java | 281 +++++++++------- .../org/prebid/cache/helpers/RandomUUID.java | 36 -- .../prebid/cache/model/ResponseObject.java | 1 + .../cache/handlers/CacheHandlerTest.java | 234 +++++++++++++ .../cache/handlers/CacheHandlerTests.java | 55 --- .../cache/handlers/ErrorHandlerTest.java | 80 +++++ .../cache/handlers/GetCacheHandlerTest.java | 316 ++++++++++++++++++ .../cache/handlers/GetCacheHandlerTests.java | 217 ------------ .../cache/handlers/PostCacheHandlerTest.java | 241 +++++++++++++ .../cache/handlers/PostCacheHandlerTests.java | 239 ------------- 17 files changed, 1230 insertions(+), 842 deletions(-) create mode 100644 src/main/java/org/prebid/cache/config/ObjectMapperConfig.java delete mode 100644 src/main/java/org/prebid/cache/handlers/MetricsHandler.java delete mode 100644 src/main/java/org/prebid/cache/helpers/RandomUUID.java create mode 100644 src/test/java/org/prebid/cache/handlers/CacheHandlerTest.java delete mode 100644 src/test/java/org/prebid/cache/handlers/CacheHandlerTests.java create mode 100644 src/test/java/org/prebid/cache/handlers/ErrorHandlerTest.java create mode 100644 src/test/java/org/prebid/cache/handlers/GetCacheHandlerTest.java delete mode 100644 src/test/java/org/prebid/cache/handlers/GetCacheHandlerTests.java create mode 100644 src/test/java/org/prebid/cache/handlers/PostCacheHandlerTest.java delete mode 100644 src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java diff --git a/src/main/java/org/prebid/cache/builders/PrebidServerResponseBuilder.java b/src/main/java/org/prebid/cache/builders/PrebidServerResponseBuilder.java index 8836e92..da6f0f9 100644 --- a/src/main/java/org/prebid/cache/builders/PrebidServerResponseBuilder.java +++ b/src/main/java/org/prebid/cache/builders/PrebidServerResponseBuilder.java @@ -31,48 +31,47 @@ public class PrebidServerResponseBuilder { private static final String HEADER_CONNECTION_KEEPALIVE = "keep-alive"; private static final String HEADER_CONNECTION_CLOSE = "close"; - public Mono createResponseMono(final ServerRequest request, - final MediaType mediaType, - final PayloadWrapper wrapper) { + public Mono createResponseMono(ServerRequest request, + MediaType mediaType, + PayloadWrapper wrapper) { + return ok(request, mediaType).body(fromValue(wrapper.getPayload().getValue())); } - public Mono createResponseMono(final ServerRequest request, - final MediaType mediaType, - final ResponseObject response) { + public Mono createResponseMono(ServerRequest request, + MediaType mediaType, + ResponseObject response) { + return ok(request, mediaType).body(fromValue(response)); } - private ServerResponse.BodyBuilder ok(final ServerRequest request, final MediaType mediaType) { + private ServerResponse.BodyBuilder ok(ServerRequest request, MediaType mediaType) { final String now = ZonedDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME); ServerResponse.BodyBuilder builder = ServerResponse.ok() - .contentType(mediaType) - .header(HttpHeaders.DATE, now) - .varyBy(HttpHeaders.ACCEPT_ENCODING) - .cacheControl(CacheControl.noCache()); + .contentType(mediaType) + .header(HttpHeaders.DATE, now) + .varyBy(HttpHeaders.ACCEPT_ENCODING) + .cacheControl(CacheControl.noCache()); applyHeaders(builder, request); return builder; } - public Mono error(final Mono monoError, - final ServerRequest request) { + public Mono error(Mono monoError, ServerRequest request) { return monoError.transform(ThrowableTranslator::translate) .flatMap(translation -> addHeaders(status(translation.getHttpStatus()), request) .body(Mono.just( - ErrorResponse.builder() - .error(translation.getHttpStatus().getReasonPhrase()) - .status(translation.getHttpStatus().value()) - .path(request.path()) - .message(translation.getErrorMessage()) - .timestamp(new Date()) - .build()), - ErrorResponse.class) - ); + ErrorResponse.builder() + .error(translation.getHttpStatus().getReasonPhrase()) + .status(translation.getHttpStatus().value()) + .path(request.path()) + .message(translation.getErrorMessage()) + .timestamp(new Date()) + .build()), + ErrorResponse.class)); } - private static ServerResponse.BodyBuilder addHeaders(final ServerResponse.BodyBuilder builder, - final ServerRequest request) { + private static ServerResponse.BodyBuilder addHeaders(ServerResponse.BodyBuilder builder, ServerRequest request) { ServerResponse.BodyBuilder headers = builder.header(HttpHeaders.DATE, ZonedDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME)) .varyBy(HttpHeaders.ACCEPT_ENCODING) @@ -81,9 +80,7 @@ private static ServerResponse.BodyBuilder addHeaders(final ServerResponse.BodyBu return applyHeaders(headers, request); } - private static ServerResponse.BodyBuilder applyHeaders(final ServerResponse.BodyBuilder builder, - final ServerRequest request) { - + private static ServerResponse.BodyBuilder applyHeaders(ServerResponse.BodyBuilder builder, ServerRequest request) { final List connectionHeaders = request.headers().header(HttpHeaders.CONNECTION); if (hasConnectionValue(connectionHeaders, HEADER_CONNECTION_KEEPALIVE)) { builder.header(HttpHeaders.CONNECTION, HEADER_CONNECTION_KEEPALIVE); @@ -96,8 +93,7 @@ private static ServerResponse.BodyBuilder applyHeaders(final ServerResponse.Body private static boolean hasConnectionValue(List connectionHeaders, String value) { return !connectionHeaders.isEmpty() && connectionHeaders.stream() - .map(String::toLowerCase) - .allMatch(Predicate.isEqual(value)); + .map(String::toLowerCase) + .allMatch(Predicate.isEqual(value)); } - } diff --git a/src/main/java/org/prebid/cache/config/ObjectMapperConfig.java b/src/main/java/org/prebid/cache/config/ObjectMapperConfig.java new file mode 100644 index 0000000..07a84c1 --- /dev/null +++ b/src/main/java/org/prebid/cache/config/ObjectMapperConfig.java @@ -0,0 +1,14 @@ +package org.prebid.cache.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ObjectMapperConfig { + + @Bean + public ObjectMapper objectMapper() { + return new ObjectMapper(); + } +} diff --git a/src/main/java/org/prebid/cache/handlers/ErrorHandler.java b/src/main/java/org/prebid/cache/handlers/ErrorHandler.java index 429c3b5..c21ede1 100644 --- a/src/main/java/org/prebid/cache/handlers/ErrorHandler.java +++ b/src/main/java/org/prebid/cache/handlers/ErrorHandler.java @@ -11,18 +11,24 @@ import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; +import java.util.Objects; + @Component @Slf4j -public class ErrorHandler extends MetricsHandler { +public class ErrorHandler { + private static final String RESOURCE_NOT_FOUND_BAD_URL = "Resource Not Found - Bad URL."; private static final String RESOURCE_NOT_FOUND = "Resource Not Found: uuid %s"; private static final String INVALID_PARAMETERS = "Invalid Parameter(s): uuid not found."; private static final String NO_ELEMENTS_FOUND = "No Elements Found."; + private final MetricsRecorder metricsRecorder; + private final PrebidServerResponseBuilder builder; + @Autowired - public ErrorHandler(final MetricsRecorder metricsRecorder, final PrebidServerResponseBuilder builder) { - this.metricsRecorder = metricsRecorder; - this.builder = builder; + public ErrorHandler(MetricsRecorder metricsRecorder, PrebidServerResponseBuilder builder) { + this.metricsRecorder = Objects.requireNonNull(metricsRecorder); + this.builder = Objects.requireNonNull(builder); } public static Mono createResourceNotFound(String uuid) { @@ -37,7 +43,7 @@ public static Mono createNoElementsFound() { return Mono.error(new BadRequestException(NO_ELEMENTS_FOUND)); } - public Mono invalidRequest(final ServerRequest request) { + public Mono invalidRequest(ServerRequest request) { metricsRecorder.getInvalidRequestMeter().increment(); return builder.error(Mono.just(new ResourceNotFoundException(RESOURCE_NOT_FOUND_BAD_URL)), request); } diff --git a/src/main/java/org/prebid/cache/handlers/MetricsHandler.java b/src/main/java/org/prebid/cache/handlers/MetricsHandler.java deleted file mode 100644 index 8aa4615..0000000 --- a/src/main/java/org/prebid/cache/handlers/MetricsHandler.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.prebid.cache.handlers; - -import org.prebid.cache.builders.PrebidServerResponseBuilder; -import org.prebid.cache.metrics.MetricsRecorder; - -public abstract class MetricsHandler { - protected MetricsRecorder metricsRecorder; - protected PrebidServerResponseBuilder builder; -} diff --git a/src/main/java/org/prebid/cache/handlers/PayloadType.java b/src/main/java/org/prebid/cache/handlers/PayloadType.java index 15d26d3..4404879 100644 --- a/src/main/java/org/prebid/cache/handlers/PayloadType.java +++ b/src/main/java/org/prebid/cache/handlers/PayloadType.java @@ -11,7 +11,7 @@ public enum PayloadType { @JsonValue private final String text; - PayloadType(final String text) { + PayloadType(String text) { this.text = text; } diff --git a/src/main/java/org/prebid/cache/handlers/cache/CacheHandler.java b/src/main/java/org/prebid/cache/handlers/cache/CacheHandler.java index 6d0bbca..49e8f6c 100644 --- a/src/main/java/org/prebid/cache/handlers/cache/CacheHandler.java +++ b/src/main/java/org/prebid/cache/handlers/cache/CacheHandler.java @@ -2,114 +2,123 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.prebid.cache.builders.PrebidServerResponseBuilder; import org.prebid.cache.exceptions.BadRequestException; import org.prebid.cache.exceptions.DuplicateKeyException; import org.prebid.cache.exceptions.RepositoryException; import org.prebid.cache.exceptions.RequestParsingException; import org.prebid.cache.exceptions.ResourceNotFoundException; import org.prebid.cache.exceptions.UnsupportedMediaTypeException; -import org.prebid.cache.handlers.MetricsHandler; import org.prebid.cache.handlers.ServiceType; import org.prebid.cache.log.ConditionalLogger; import org.prebid.cache.metrics.MetricsRecorder; import org.prebid.cache.metrics.MetricsRecorder.MetricsRecorderTimer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.codec.DecodingException; import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; +import org.springframework.web.server.UnsupportedMediaTypeStatusException; import reactor.core.publisher.Mono; -import reactor.core.publisher.Signal; +import java.util.Objects; import java.util.concurrent.TimeoutException; +@Component @Slf4j -public abstract class CacheHandler extends MetricsHandler { +public class CacheHandler { + private static final ConditionalLogger CONDITIONAL_LOGGER = new ConditionalLogger(log); + + public static final String ID_KEY = "uuid"; + public static final String CACHE_HOST_KEY = "ch"; private static final int UNKNOWN_SIZE_VALUE = 1; - ServiceType type; - static final String ID_KEY = "uuid"; - static final String CACHE_HOST_KEY = "ch"; - private static final String UUID_DUPLICATION = "UUID duplication."; - protected String metricTagPrefix; + private final PrebidServerResponseBuilder builder; + private final MetricsRecorder metricsRecorder; + private final double samplingRate; - private final ConditionalLogger conditionalLogger; - private final Double samplingRate; + @Autowired + public CacheHandler(PrebidServerResponseBuilder builder, + MetricsRecorder metricsRecorder, + @Value("${sampling.rate:0.01}") double samplingRate) { - protected CacheHandler(Double samplingRate) { + this.builder = Objects.requireNonNull(builder); + this.metricsRecorder = Objects.requireNonNull(metricsRecorder); this.samplingRate = samplingRate; - this.conditionalLogger = new ConditionalLogger(log); } - public Mono validateErrorResult(final Mono mono) { - return mono.doOnSuccess(v -> log.debug("{}: {}", type, v)) + public MetricsRecorder.MetricsRecorderTimer timerContext(ServiceType type, String metricTagPrefix) { + metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.REQUEST); + return metricsRecorder.createRequestTimerForServiceType(type); + } + + public Mono validateErrorResult(ServiceType type, Mono mono) { + return mono + .doOnSuccess(v -> log.debug("{}: {}", type, v)) .onErrorMap(DuplicateKeyException.class, error -> { metricsRecorder.getExistingKeyError().increment(); - return new BadRequestException(UUID_DUPLICATION); + return new BadRequestException("UUID duplication."); }) - .onErrorMap(org.springframework.core.codec.DecodingException.class, error -> - new RequestParsingException(error.toString())) - .onErrorMap(org.springframework.web.server.UnsupportedMediaTypeStatusException.class, error -> + .onErrorMap(DecodingException.class, error -> new RequestParsingException(error.toString())) + .onErrorMap(UnsupportedMediaTypeStatusException.class, error -> new UnsupportedMediaTypeException(error.toString())); } - Mono finalizeResult(final Mono mono, - final ServerRequest request, - final MetricsRecorderTimer timerContext) { + public Mono finalizeResult(Mono mono, + ServerRequest request, + MetricsRecorderTimer timerContext, + String metricTagPrefix) { + // transform to error, if needed and send metrics return mono - .onErrorResume(throwable -> handleErrorMetrics(throwable, request)) - .doOnEach(signal -> { - if (timerContext != null) + .onErrorResume(throwable -> handleErrorMetrics(throwable, request, metricTagPrefix)) + .doOnNext(ignored -> { + if (timerContext != null) { timerContext.stop(); + } }); } - private Mono handleErrorMetrics(final Throwable error, final ServerRequest request) { - if (error instanceof RepositoryException) { - recordMetric(MetricsRecorder.MeasurementTag.ERROR_DB); - } else if (error instanceof ResourceNotFoundException) { - conditionalLogger.info( + private Mono handleErrorMetrics(Throwable error, ServerRequest request, String metricTagPrefix) { + switch (error) { + case RepositoryException repositoryException -> + metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_DB); + case ResourceNotFoundException resourceNotFoundException -> CONDITIONAL_LOGGER.info( error.getMessage() + ". Refererring URLs: " + request.headers().header(HttpHeaders.REFERER) + ". Request URI: " + request.uri(), samplingRate); - } else if (error instanceof BadRequestException) { - log.error(error.getMessage()); - } else if (error instanceof TimeoutException) { - metricsRecorder.markMeterForTag(this.metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_TIMEDOUT); - } else if (error instanceof DataBufferLimitException) { - final long contentLength = request.headers().contentLength().orElse(UNKNOWN_SIZE_VALUE); - conditionalLogger.error( - "Request length: `" + contentLength + "` exceeds maximum size limit", - samplingRate); - } else { - log.error("Error occurred while processing the request: '{}', cause: '{}'", - ExceptionUtils.getMessage(error), ExceptionUtils.getMessage(error)); + case BadRequestException badRequestException -> log.error(error.getMessage()); + case TimeoutException timeoutException -> + metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_TIMEDOUT); + case DataBufferLimitException dataBufferLimitException -> { + final long contentLength = request.headers().contentLength().orElse(UNKNOWN_SIZE_VALUE); + CONDITIONAL_LOGGER.error( + "Request length: `" + contentLength + "` exceeds maximum size limit", + samplingRate); + } + default -> log.error( + "Error occurred while processing the request: '{}', cause: '{}'", + ExceptionUtils.getMessage(error), + ExceptionUtils.getMessage(error)); } return builder.error(Mono.just(error), request) - .doOnEach(signal -> handleErrorStatusCodes(request, signal)); + .doOnSuccess(response -> handleErrorStatusCodes(response, metricTagPrefix)); } - private void handleErrorStatusCodes(ServerRequest request, Signal signal) { - final var response = signal.get(); - HttpMethod method = request.method(); - if (method == null || signal.isOnError() || response == null) { - recordMetric(MetricsRecorder.MeasurementTag.ERROR_UNKNOWN); - } else if (response.statusCode() == HttpStatus.INTERNAL_SERVER_ERROR) { - recordMetric(MetricsRecorder.MeasurementTag.ERROR_UNKNOWN); + private void handleErrorStatusCodes(ServerResponse response, String metricTagPrefix) { + if (response == null || response.statusCode() == HttpStatus.INTERNAL_SERVER_ERROR) { + metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_UNKNOWN); } else if (response.statusCode() == HttpStatus.BAD_REQUEST) { - recordMetric(MetricsRecorder.MeasurementTag.ERROR_BAD_REQUEST); + metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_BAD_REQUEST); } else if (response.statusCode() == HttpStatus.NOT_FOUND) { - recordMetric(MetricsRecorder.MeasurementTag.ERROR_MISSINGID); + metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_MISSINGID); } } - - private void recordMetric(MetricsRecorder.MeasurementTag tag) { - metricsRecorder.markMeterForTag(this.metricTagPrefix, tag); - } - } diff --git a/src/main/java/org/prebid/cache/handlers/cache/GetCacheHandler.java b/src/main/java/org/prebid/cache/handlers/cache/GetCacheHandler.java index 3cb5739..aa024cc 100644 --- a/src/main/java/org/prebid/cache/handlers/cache/GetCacheHandler.java +++ b/src/main/java/org/prebid/cache/handlers/cache/GetCacheHandler.java @@ -19,7 +19,6 @@ import org.prebid.cache.repository.ReactiveRepository; import org.prebid.cache.routers.ApiConfig; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; @@ -33,41 +32,45 @@ import java.time.Duration; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; @Component @Slf4j -public class GetCacheHandler extends CacheHandler { +public class GetCacheHandler { + + private static final ServiceType SERVICE_TYPE = ServiceType.FETCH; + private static final String METRIC_TAG_PREFIX = "read"; - private final ReactiveRepository repository; private final CacheConfig config; private final ApiConfig apiConfig; - private final CircuitBreaker circuitBreaker; + private final CacheHandler cacheHandler; private final Map clientsCache; - private static final String UNSUPPORTED_MEDIATYPE = "Unsupported Media Type."; + private final ReactiveRepository repository; + private final CircuitBreaker circuitBreaker; + private final PrebidServerResponseBuilder builder; + private final MetricsRecorder metricsRecorder; @Autowired - public GetCacheHandler(final ReactiveRepository repository, - final CacheConfig config, - final ApiConfig apiConfig, - final MetricsRecorder metricsRecorder, - final PrebidServerResponseBuilder builder, - final CircuitBreaker webClientCircuitBreaker, - @Value("${sampling.rate:0.01}") final Double samplingRate) { - - super(samplingRate); - this.metricsRecorder = metricsRecorder; - this.type = ServiceType.FETCH; - this.repository = repository; - this.config = config; - this.apiConfig = apiConfig; - this.builder = builder; - this.metricTagPrefix = "read"; - this.circuitBreaker = webClientCircuitBreaker; + public GetCacheHandler(CacheConfig config, + ApiConfig apiConfig, + CacheHandler cacheHandler, + ReactiveRepository repository, + CircuitBreaker webClientCircuitBreaker, + PrebidServerResponseBuilder builder, + MetricsRecorder metricsRecorder) { + + this.config = Objects.requireNonNull(config); + this.apiConfig = Objects.requireNonNull(apiConfig); + this.cacheHandler = Objects.requireNonNull(cacheHandler); this.clientsCache = createClientsCache(config.getClientsCacheDuration(), config.getClientsCacheSize()); + this.repository = Objects.requireNonNull(repository); + this.circuitBreaker = Objects.requireNonNull(webClientCircuitBreaker); + this.builder = Objects.requireNonNull(builder); + this.metricsRecorder = Objects.requireNonNull(metricsRecorder); } - private static Map createClientsCache(final int ttl, final int size) { + private static Map createClientsCache(int ttl, int size) { return Caffeine.newBuilder() .expireAfterWrite(ttl, TimeUnit.SECONDS) .maximumSize(size) @@ -77,50 +80,41 @@ private static Map createClientsCache(final int ttl, final in public Mono fetch(ServerRequest request) { // metrics - metricsRecorder.markMeterForTag(this.metricTagPrefix, MetricsRecorder.MeasurementTag.REQUEST); - final var timerContext = metricsRecorder.createRequestTimerForServiceType(this.type); - - return request.queryParam(ID_KEY).map(id -> fetch(request, id, timerContext)).orElseGet(() -> { - final var responseMono = ErrorHandler.createInvalidParameters(); - return finalizeResult(responseMono, request, timerContext); - }); + final MetricsRecorder.MetricsRecorderTimer timerContext = + cacheHandler.timerContext(SERVICE_TYPE, METRIC_TAG_PREFIX); + + return request.queryParam(CacheHandler.ID_KEY) + .map(id -> fetch(request, id, timerContext)) + .orElseGet(() -> cacheHandler.finalizeResult( + ErrorHandler.createInvalidParameters(), + request, + timerContext, + METRIC_TAG_PREFIX)); } - private Mono fetch(final ServerRequest request, - final String id, - final MetricsRecorderTimer timerContext) { - - final var cacheUrl = resolveCacheUrl(request); + private Mono fetch(ServerRequest request, String id, MetricsRecorderTimer timerContext) { + final String cacheUrl = resolveCacheUrl(request); + final Mono responseMono = StringUtils.containsIgnoreCase(cacheUrl, config.getAllowedProxyHost()) + ? processProxyRequest(request, id, cacheUrl) + : processRequest(request, id); - final var responseMono = - StringUtils.containsIgnoreCase(cacheUrl, config.getAllowedProxyHost()) - ? processProxyRequest(request, id, cacheUrl) - : processRequest(request, id); - - return finalizeResult(responseMono, request, timerContext); + return cacheHandler.finalizeResult(responseMono, request, timerContext, METRIC_TAG_PREFIX); } - private String resolveCacheUrl(final ServerRequest request) { - final var cacheHostParam = request.queryParam(CACHE_HOST_KEY).orElse(null); - if (StringUtils.isNotBlank(cacheHostParam)) { - return new URIBuilder() - .setHost(cacheHostParam) - .setPath(apiConfig.getCachePath()) - .setScheme(config.getHostParamProtocol()) - .toString(); - } - - return null; + private String resolveCacheUrl(ServerRequest request) { + return request.queryParam(CacheHandler.CACHE_HOST_KEY) + .filter(StringUtils::isNotBlank) + .map(cacheHostParam -> new URIBuilder() + .setHost(cacheHostParam) + .setPath(apiConfig.getCachePath()) + .setScheme(config.getHostParamProtocol()) + .toString()) + .orElse(null); } - private Mono processProxyRequest(final ServerRequest request, - final String idKeyParam, - final String cacheUrl) { - - final WebClient webClient = clientsCache.computeIfAbsent(cacheUrl, WebClient::create); - - return webClient.get() - .uri(uriBuilder -> uriBuilder.queryParam(ID_KEY, idKeyParam).build()) + private Mono processProxyRequest(ServerRequest request, String idKeyParam, String cacheUrl) { + return clientsCache.computeIfAbsent(cacheUrl, WebClient::create).get() + .uri(uriBuilder -> uriBuilder.queryParam(CacheHandler.ID_KEY, idKeyParam).build()) .headers(httpHeaders -> httpHeaders.addAll(request.headers().asHttpHeaders())) .exchange() .transform(CircuitBreakerOperator.of(circuitBreaker)) @@ -128,15 +122,10 @@ private Mono processProxyRequest(final ServerRequest request, .subscribeOn(Schedulers.parallel()) .handle(this::updateProxyMetrics) .flatMap(GetCacheHandler::fromClientResponse) - .doOnError(error -> { - metricsRecorder.getProxyFailure().increment(); - log.info("Failed to send request: '{}', cause: '{}'", - ExceptionUtils.getMessage(error), ExceptionUtils.getMessage(error)); - }); + .doOnError(this::handleProxyRequestError); } - private void updateProxyMetrics(final ClientResponse clientResponse, - final SynchronousSink sink) { + private void updateProxyMetrics(ClientResponse clientResponse, SynchronousSink sink) { if (HttpStatus.OK.equals(clientResponse.statusCode())) { metricsRecorder.getProxySuccess().increment(); } else { @@ -146,31 +135,40 @@ private void updateProxyMetrics(final ClientResponse clientResponse, sink.next(clientResponse); } - private static Mono fromClientResponse(final ClientResponse clientResponse) { + private static Mono fromClientResponse(ClientResponse clientResponse) { return ServerResponse.status(clientResponse.statusCode()) .headers(headerConsumer -> clientResponse.headers().asHttpHeaders().forEach(headerConsumer::addAll)) .body(clientResponse.bodyToMono(String.class), String.class); } - private Mono processRequest(final ServerRequest request, final String keyIdParam) { - final var normalizedId = String.format("%s%s", config.getPrefix(), keyIdParam); + private void handleProxyRequestError(Throwable error) { + metricsRecorder.getProxyFailure().increment(); + log.info( + "Failed to send request: '{}', cause: '{}'", + ExceptionUtils.getMessage(error), + ExceptionUtils.getMessage(error)); + } + + private Mono processRequest(ServerRequest request, String keyIdParam) { + final String normalizedId = String.format("%s%s", config.getPrefix(), keyIdParam); return repository.findById(normalizedId) .subscribeOn(Schedulers.parallel()) - .transform(this::validateErrorResult) + .transform(mono -> cacheHandler.validateErrorResult(SERVICE_TYPE, mono)) .flatMap(wrapper -> createServerResponse(wrapper, request)) .switchIfEmpty(ErrorHandler.createResourceNotFound(normalizedId)); } - private Mono createServerResponse(final PayloadWrapper wrapper, final ServerRequest request) { - if (wrapper.getPayload().getType().equals(PayloadType.JSON.toString())) { - metricsRecorder.markMeterForTag(this.metricTagPrefix, MetricsRecorder.MeasurementTag.JSON); + private Mono createServerResponse(PayloadWrapper wrapper, ServerRequest request) { + final String payloadType = wrapper.getPayload().getType(); + if (payloadType.equals(PayloadType.JSON.toString())) { + metricsRecorder.markMeterForTag(METRIC_TAG_PREFIX, MetricsRecorder.MeasurementTag.JSON); return builder.createResponseMono(request, MediaType.APPLICATION_JSON_UTF8, wrapper); - } else if (wrapper.getPayload().getType().equals(PayloadType.XML.toString())) { - metricsRecorder.markMeterForTag(this.metricTagPrefix, MetricsRecorder.MeasurementTag.XML); + } else if (payloadType.equals(PayloadType.XML.toString())) { + metricsRecorder.markMeterForTag(METRIC_TAG_PREFIX, MetricsRecorder.MeasurementTag.XML); return builder.createResponseMono(request, MediaType.APPLICATION_XML, wrapper); } - return Mono.error(new UnsupportedMediaTypeException(UNSUPPORTED_MEDIATYPE)); + return Mono.error(new UnsupportedMediaTypeException("Unsupported Media Type.")); } } diff --git a/src/main/java/org/prebid/cache/handlers/cache/PostCacheHandler.java b/src/main/java/org/prebid/cache/handlers/cache/PostCacheHandler.java index ff4d997..f88073c 100644 --- a/src/main/java/org/prebid/cache/handlers/cache/PostCacheHandler.java +++ b/src/main/java/org/prebid/cache/handlers/cache/PostCacheHandler.java @@ -7,13 +7,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.http.client.utils.URIBuilder; import org.prebid.cache.builders.PrebidServerResponseBuilder; -import org.prebid.cache.exceptions.ExpiryOutOfRangeException; import org.prebid.cache.exceptions.InvalidUUIDException; -import org.prebid.cache.exceptions.RequestBodyDeserializeException; import org.prebid.cache.handlers.ErrorHandler; import org.prebid.cache.handlers.ServiceType; -import org.prebid.cache.helpers.RandomUUID; import org.prebid.cache.metrics.MetricsRecorder; import org.prebid.cache.model.Payload; import org.prebid.cache.model.PayloadTransfer; @@ -23,11 +21,11 @@ import org.prebid.cache.repository.CacheConfig; import org.prebid.cache.repository.ReactiveRepository; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyExtractors; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; @@ -37,179 +35,230 @@ import reactor.core.scheduler.Schedulers; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.UUID; import java.util.function.Function; +import java.util.regex.Pattern; @Component @Slf4j -public class PostCacheHandler extends CacheHandler { +public class PostCacheHandler { - private static final String UUID_KEY = "uuid"; + private static final ServiceType SERVICE_TYPE = ServiceType.SAVE; + private static final String METRIC_TAG_PREFIX = "write"; + private static final Pattern UUID_PATTERN = Pattern.compile("^[a-zA-Z0-9_-]*$"); private static final String SECONDARY_CACHE_KEY = "secondaryCache"; + private static final String SECONDARY_CACHE_VALUE = "yes"; - private final ReactiveRepository repository; private final CacheConfig config; - private final Function> payloadWrapperToMapTransformer = payload -> - ImmutableMap.of(UUID_KEY, payload.getId()); - private final Map webClients = new HashMap<>(); - private final ObjectMapper objectMapper = new ObjectMapper(); + private final CacheHandler cacheHandler; + private final Map webClients; + private final ReactiveRepository repository; private final CircuitBreaker circuitBreaker; + private final PrebidServerResponseBuilder builder; + private final MetricsRecorder metricsRecorder; + private final ObjectMapper mapper; @Autowired - public PostCacheHandler(final ReactiveRepository repository, - final CacheConfig config, - final MetricsRecorder metricsRecorder, - final PrebidServerResponseBuilder builder, - final CircuitBreaker webClientCircuitBreaker, - @Value("${sampling.rate:0.01}") final Double samplingRate) { - - super(samplingRate); - this.metricsRecorder = metricsRecorder; - this.type = ServiceType.SAVE; - this.repository = repository; - this.config = config; - if (config.getSecondaryUris() != null) { - config.getSecondaryUris().forEach(ip -> webClients.put(ip, WebClient.create(ip))); + public PostCacheHandler(CacheConfig config, + CacheHandler cacheHandler, + ReactiveRepository repository, + CircuitBreaker webClientCircuitBreaker, + PrebidServerResponseBuilder builder, + MetricsRecorder metricsRecorder, + ObjectMapper objectMapper) { + + this.config = Objects.requireNonNull(config); + this.cacheHandler = Objects.requireNonNull(cacheHandler); + this.webClients = createWebClients(config); + this.repository = Objects.requireNonNull(repository); + this.circuitBreaker = Objects.requireNonNull(webClientCircuitBreaker); + this.builder = Objects.requireNonNull(builder); + this.metricsRecorder = Objects.requireNonNull(metricsRecorder); + this.mapper = Objects.requireNonNull(objectMapper); + } + + private static Map createWebClients(CacheConfig config) { + final Map webClients = new HashMap<>(); + final List secondaryUris = config.getSecondaryUris(); + if (secondaryUris == null || secondaryUris.isEmpty()) { + return Collections.unmodifiableMap(webClients); } - this.builder = builder; - this.metricTagPrefix = "write"; - this.circuitBreaker = webClientCircuitBreaker; + + for (String ip : secondaryUris) { + final String url = new URIBuilder() + /* + * TODO: scheme? + * Previous version: + * - WebClient.create(ip) + * - webClient.post() + .uri(uriBuilder -> uriBuilder.path(config.getSecondaryCachePath()) + .queryParam("secondaryCache", "yes").build()) + */ + .setScheme(config.getHostParamProtocol()) + .setHost(ip) + .setPath(config.getSecondaryCachePath()) + .addParameter(SECONDARY_CACHE_KEY, SECONDARY_CACHE_VALUE) + .toString(); + + webClients.put(ip, WebClient.create(url)); + } + + return Collections.unmodifiableMap(webClients); } - public Mono save(final ServerRequest request) { - metricsRecorder.markMeterForTag(this.metricTagPrefix, MetricsRecorder.MeasurementTag.REQUEST); - final var timerContext = metricsRecorder.createRequestTimerForServiceType(type); + public Mono save(ServerRequest request) { + final MetricsRecorder.MetricsRecorderTimer timerContext = + cacheHandler.timerContext(SERVICE_TYPE, METRIC_TAG_PREFIX); - String secondaryCache = request.queryParam(SECONDARY_CACHE_KEY).orElse(StringUtils.EMPTY); + final String secondaryCache = request.queryParam(SECONDARY_CACHE_KEY).orElse(StringUtils.EMPTY); - final var bodyMono = getRequestBodyMono(request); - final var monoList = bodyMono.map(RequestObject::getPuts); - final var flux = monoList.flatMapMany(Flux::fromIterable); - final var payloadFlux = flux + final Mono responseMono = getRequestBodyMono(request) + .map(RequestObject::getPuts) + .flatMapMany(Flux::fromIterable) .map(payload -> payload.toBuilder() .prefix(config.getPrefix()) .expiry(adjustExpiry(payload.compareAndGetExpiry())) .build()) .map(payloadWrapperTransformer()) .handle(this::validateUUID) - .handle(this::validateExpiry) + .concatMap(repository::save) .subscribeOn(Schedulers.parallel()) .collectList() .doOnNext(payloadWrappers -> sendRequestToSecondaryPrebidCacheHosts(payloadWrappers, secondaryCache)) - .flatMapMany(Flux::fromIterable) - .subscribeOn(Schedulers.parallel()); - final Mono responseMono = payloadFlux - .map(payloadWrapperToMapTransformer) + .flatMapMany(Flux::fromIterable) + .subscribeOn(Schedulers.parallel()) + .map(payload -> (Map) ImmutableMap.of(CacheHandler.ID_KEY, payload.getId())) .collectList() - .transform(this::validateErrorResult) + .transform(mono -> cacheHandler.validateErrorResult(SERVICE_TYPE, mono)) .map(ResponseObject::of) - .flatMap(response -> { - if (response.getResponses().isEmpty()) { - return ErrorHandler.createNoElementsFound(); - } else { - return builder.createResponseMono(request, MediaType.APPLICATION_JSON_UTF8, response); - } - }); - - return finalizeResult(responseMono, request, timerContext); + .flatMap(response -> !response.getResponses().isEmpty() + ? builder.createResponseMono(request, MediaType.APPLICATION_JSON_UTF8, response) + : ErrorHandler.createNoElementsFound()); + + return cacheHandler.finalizeResult(responseMono, request, timerContext, METRIC_TAG_PREFIX); + } + + private Mono getRequestBodyMono(ServerRequest request) { + final MediaType mediaType = request.headers().contentType().orElse(null); + if (!MediaType.TEXT_PLAIN.equals(mediaType)) { + return request.bodyToMono(RequestObject.class); + } + + return request.body(BodyExtractors.toMono(String.class)) + .mapNotNull(this::readRequestObject); + } + + private RequestObject readRequestObject(String body) { + try { + return mapper.readValue(body, RequestObject.class); + } catch (IOException e) { + log.error( + "Exception occurred while deserialize request body: '{}', cause: '{}'", + ExceptionUtils.getMessage(e), + ExceptionUtils.getMessage(e)); + return null; + } + } + + private long adjustExpiry(Long expiry) { + return expiry != null + ? Math.min(Math.max(expiry, config.getMinExpiry()), config.getMaxExpiry()) + : config.getExpirySec(); } private Function payloadWrapperTransformer() { return transfer -> PayloadWrapper.builder() - .id(RandomUUID.extractUUID(transfer)) + .id(uuidFrom(transfer)) .prefix(transfer.getPrefix()) .payload(Payload.of(transfer.getType(), transfer.getKey(), transfer.valueAsString())) .expiry(transfer.getExpiry()) - .isExternalId(RandomUUID.isExternalUUID(transfer)) + .isExternalId(transfer.getKey() != null) .build(); } - private void validateUUID(final PayloadWrapper payload, final SynchronousSink sink) { - if (payload.isExternalId() && !config.isAllowExternalUUID()) { + private static String uuidFrom(PayloadTransfer payload) { + return payload.getKey() != null ? payload.getKey() : String.valueOf(UUID.randomUUID()); + } + + private void validateUUID(PayloadWrapper payload, SynchronousSink sink) { + if (!payload.isExternalId()) { + sink.next(payload); + return; + } + + if (!config.isAllowExternalUUID()) { sink.error(new InvalidUUIDException("Prebid cache host forbids specifying UUID in request.")); return; } - if (RandomUUID.isValidUUID(payload.getId())) { + + final String uuid = payload.getId(); + if (isValidUUID(uuid)) { sink.next(payload); } else { - sink.error(new InvalidUUIDException("Invalid UUID: [" + payload.getId() + "].")); + sink.error(new InvalidUUIDException("Invalid UUID: [" + uuid + "].")); } } - private void validateExpiry(final PayloadWrapper payload, final SynchronousSink sink) { - if (payload.getExpiry() == null) { - sink.error(new ExpiryOutOfRangeException("Invalid Expiry [NULL].")); + private static boolean isValidUUID(String uuid) { + if (uuid == null || uuid.isEmpty()) { + log.error("UUID cannot be NULL or zero length !!"); + return false; } - sink.next(payload); - } - - private long adjustExpiry(Long expiry) { - if (expiry == null) { - return config.getExpirySec(); - } else if (expiry > config.getMaxExpiry()) { - return config.getMaxExpiry(); - } else if (expiry < config.getMinExpiry()) { - return config.getMinExpiry(); - } else { - return expiry; + final boolean isValid = UUID_PATTERN.matcher(uuid).matches(); + if (!isValid) { + log.debug("Invalid UUID: {}", uuid); } + return isValid; } private void sendRequestToSecondaryPrebidCacheHosts(List payloadWrappers, String secondaryCache) { - if (!"yes".equals(secondaryCache) && webClients.size() != 0) { - final List payloadTransfers = new ArrayList<>(); - for (PayloadWrapper payloadWrapper : payloadWrappers) { - payloadTransfers.add(wrapperToTransfer(payloadWrapper)); - } - - webClients.forEach((ip, webClient) -> webClient.post() - .uri(uriBuilder -> uriBuilder.path(config.getSecondaryCachePath()) - .queryParam("secondaryCache", "yes").build()) - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(RequestObject.of(payloadTransfers)) - .exchange() - .transform(CircuitBreakerOperator.of(circuitBreaker)) - .doOnError(throwable -> { - metricsRecorder.getSecondaryCacheWriteError().increment(); - log.info("Failed to send request: '{}', cause: '{}'", - ExceptionUtils.getMessage(throwable), ExceptionUtils.getMessage(throwable)); - }) - .subscribe(clientResponse -> { - if (clientResponse.statusCode() != HttpStatus.OK) { - metricsRecorder.getSecondaryCacheWriteError().increment(); - log.debug(clientResponse.statusCode().toString()); - log.info("Failed to write to remote address : {}", ip); - } - })); + if (webClients.isEmpty() || SECONDARY_CACHE_VALUE.equals(secondaryCache)) { + return; } + + final List payloadTransfers = payloadWrappers.stream() + .map(PostCacheHandler::wrapperToTransfer) + .toList(); + + webClients.forEach((ip, webClient) -> webClient.post() + .contentType(MediaType.APPLICATION_JSON) + .bodyValue(RequestObject.of(payloadTransfers)) + .exchange() + .transform(CircuitBreakerOperator.of(circuitBreaker)) + .doOnError(this::handleSecondaryCacheRequestError) + .subscribe(clientResponse -> handleSecondaryCacheResponse(clientResponse, ip))); + } + + private static PayloadTransfer wrapperToTransfer(PayloadWrapper wrapper) { + return PayloadTransfer.builder() + .type(wrapper.getPayload().getType()) + .key(wrapper.getId()) + .value(wrapper.getPayload().getValue()) + .expiry(wrapper.getExpiry()) + .build(); } - private PayloadTransfer wrapperToTransfer(final PayloadWrapper wrapper) { - return PayloadTransfer.builder().type(wrapper.getPayload().getType()) - .key(wrapper.getId()).value(wrapper.getPayload().getValue()).expiry(wrapper.getExpiry()).build(); + private void handleSecondaryCacheRequestError(Throwable throwable) { + metricsRecorder.getSecondaryCacheWriteError().increment(); + log.info( + "Failed to send request: '{}', cause: '{}'", + ExceptionUtils.getMessage(throwable), + ExceptionUtils.getMessage(throwable)); } - private Mono getRequestBodyMono(final ServerRequest request) { - if (MediaType.TEXT_PLAIN.equals(request.headers().contentType().orElse(MediaType.APPLICATION_JSON))) { - return request.body(BodyExtractors.toMono(String.class)).map(value -> { - RequestObject requestObject = null; - try { - requestObject = objectMapper.readValue(value, RequestObject.class); - } catch (IOException e) { - log.error("Exception occurred while deserialize request body: '{}', cause: '{}'", - ExceptionUtils.getMessage(e), ExceptionUtils.getMessage(e)); - } - return requestObject; - }).doOnError(throwable -> - Mono.error(new RequestBodyDeserializeException("Exception occurred while deserialize request body", - throwable))); + private void handleSecondaryCacheResponse(ClientResponse clientResponse, String ip) { + if (clientResponse.statusCode() != HttpStatus.OK) { + metricsRecorder.getSecondaryCacheWriteError().increment(); + log.debug(clientResponse.statusCode().toString()); + log.info("Failed to write to remote address : {}", ip); } - return request.bodyToMono(RequestObject.class); } } diff --git a/src/main/java/org/prebid/cache/helpers/RandomUUID.java b/src/main/java/org/prebid/cache/helpers/RandomUUID.java deleted file mode 100644 index e794249..0000000 --- a/src/main/java/org/prebid/cache/helpers/RandomUUID.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.prebid.cache.helpers; - -import lombok.extern.slf4j.Slf4j; -import org.prebid.cache.model.PayloadTransfer; - -import java.util.UUID; - -@Slf4j -public class RandomUUID { - - private RandomUUID() { - } - - public static String extractUUID(final PayloadTransfer payload) { - return (payload.getKey() != null) - ? payload.getKey() : String.valueOf(UUID.randomUUID()); - } - - public static boolean isExternalUUID(final PayloadTransfer payload) { - return payload.getKey() != null; - } - - public static boolean isValidUUID(String uuid) { - if (uuid == null || uuid.length() == 0) { - log.error("UUID cannot be NULL or zero length !!"); - return false; - } - - // check for alphanumeric, hyphen, and underscore - boolean isValid = uuid.matches("^[a-zA-Z0-9_-]*$"); - if (!isValid) - log.debug("Invalid UUID: {}", uuid); - - return isValid; - } -} diff --git a/src/main/java/org/prebid/cache/model/ResponseObject.java b/src/main/java/org/prebid/cache/model/ResponseObject.java index 2145f9c..80c7d39 100644 --- a/src/main/java/org/prebid/cache/model/ResponseObject.java +++ b/src/main/java/org/prebid/cache/model/ResponseObject.java @@ -7,5 +7,6 @@ @Value(staticConstructor = "of") public class ResponseObject { + List> responses; } diff --git a/src/test/java/org/prebid/cache/handlers/CacheHandlerTest.java b/src/test/java/org/prebid/cache/handlers/CacheHandlerTest.java new file mode 100644 index 0000000..9db4364 --- /dev/null +++ b/src/test/java/org/prebid/cache/handlers/CacheHandlerTest.java @@ -0,0 +1,234 @@ +package org.prebid.cache.handlers; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.prebid.cache.builders.PrebidServerResponseBuilder; +import org.prebid.cache.exceptions.BadRequestException; +import org.prebid.cache.exceptions.DuplicateKeyException; +import org.prebid.cache.exceptions.RepositoryException; +import org.prebid.cache.exceptions.RequestParsingException; +import org.prebid.cache.exceptions.UnsupportedMediaTypeException; +import org.prebid.cache.handlers.cache.CacheHandler; +import org.prebid.cache.metrics.MetricsRecorder; +import org.prebid.cache.metrics.MetricsRecorderTest; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.core.codec.DecodingException; +import org.springframework.http.HttpStatus; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.web.reactive.function.server.ServerResponse; +import org.springframework.web.server.UnsupportedMediaTypeStatusException; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = {MetricsRecorderTest.class}) +@EnableConfigurationProperties +@SpringBootTest +public class CacheHandlerTest { + + @MockBean + private PrebidServerResponseBuilder builder; + + @SpyBean + private MetricsRecorder metricsRecorder; + + private CacheHandler target; + + @BeforeEach + public void setUp() { + target = new CacheHandler(builder, metricsRecorder, 0); + } + + @Test + public void timerContextShouldReturnExpectedTimerContext() { + // when + target.timerContext(ServiceType.FETCH, "prefix"); + + // then + verify(metricsRecorder).markMeterForTag(eq("prefix"), eq(MetricsRecorder.MeasurementTag.REQUEST)); + verify(metricsRecorder).createRequestTimerForServiceType(eq(ServiceType.FETCH)); + } + + @Test + public void validateErrorResultShouldReturnBadRequestExceptionOnDuplicateKeyException() { + // given + final Mono error = Mono.error(new DuplicateKeyException("error")); + + // when + final Mono result = target.validateErrorResult(ServiceType.FETCH, error); + + // then + StepVerifier.create(result) + .expectError(BadRequestException.class) + .verify(); + verify(metricsRecorder).getExistingKeyError(); + } + + @Test + public void validateErrorResultShouldReturnRequestParsingExceptionOnDecodingException() { + // given + final Mono error = Mono.error(new DecodingException("error")); + + // when + final Mono result = target.validateErrorResult(ServiceType.FETCH, error); + + // then + StepVerifier.create(result) + .expectError(RequestParsingException.class) + .verify(); + } + + @Test + public void validateErrorResultShouldReturnUnsupportedMediaTypeOnUnsupportedMediaTypeStatus() { + // given + final Mono error = Mono.error(new UnsupportedMediaTypeStatusException("error")); + + // when + final Mono result = target.validateErrorResult(ServiceType.FETCH, error); + + // then + StepVerifier.create(result) + .expectError(UnsupportedMediaTypeException.class) + .verify(); + } + + @Test + public void finalizeResultShouldStopTimer() { + // given + final ServerResponse serverResponse = mock(ServerResponse.class); + final Mono mono = Mono.just(serverResponse); + final MetricsRecorder.MetricsRecorderTimer timerContext = mock(MetricsRecorder.MetricsRecorderTimer.class); + + // when + final Mono result = target.finalizeResult(mono, null, timerContext, null); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(timerContext).stop(); + } + + @Test + public void finalizeResultShouldHandleRepositoryException() { + // given + final ServerResponse serverResponse = mock(ServerResponse.class); + given(builder.error(any(), any())).willReturn(Mono.just(serverResponse)); + + final Mono mono = Mono.error(new RepositoryException("error")); + final MetricsRecorder.MetricsRecorderTimer timerContext = mock(MetricsRecorder.MetricsRecorderTimer.class); + + // when + final Mono result = target.finalizeResult(mono, null, timerContext, "prefix"); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(metricsRecorder).markMeterForTag(eq("prefix"), eq(MetricsRecorder.MeasurementTag.ERROR_DB)); + } + + @Test + public void finalizeResultShouldHandleTimeoutException() { + // given + final ServerResponse serverResponse = mock(ServerResponse.class); + given(builder.error(any(), any())).willReturn(Mono.just(serverResponse)); + + final Mono mono = Mono.error(new TimeoutException("error")); + final MetricsRecorder.MetricsRecorderTimer timerContext = mock(MetricsRecorder.MetricsRecorderTimer.class); + + // when + final Mono result = target.finalizeResult(mono, null, timerContext, "prefix"); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(metricsRecorder).markMeterForTag(eq("prefix"), eq(MetricsRecorder.MeasurementTag.ERROR_TIMEDOUT)); + } + + @Test + public void finalizeResultShouldHandleInternalServerErrorStatusCode() { + // given + final ServerResponse serverResponse = mock(ServerResponse.class); + given(serverResponse.statusCode()).willReturn(HttpStatus.INTERNAL_SERVER_ERROR); + given(builder.error(any(), any())).willReturn(Mono.just(serverResponse)); + + final Mono mono = Mono.error(new BadRequestException("error")); + final MetricsRecorder.MetricsRecorderTimer timerContext = mock(MetricsRecorder.MetricsRecorderTimer.class); + + // when + final Mono result = target.finalizeResult(mono, null, timerContext, "prefix"); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(metricsRecorder).markMeterForTag(eq("prefix"), eq(MetricsRecorder.MeasurementTag.ERROR_UNKNOWN)); + } + + @Test + public void finalizeResultShouldHandleBadRequestStatusCode() { + // given + final ServerResponse serverResponse = mock(ServerResponse.class); + given(serverResponse.statusCode()).willReturn(HttpStatus.BAD_REQUEST); + given(builder.error(any(), any())).willReturn(Mono.just(serverResponse)); + + final Mono mono = Mono.error(new BadRequestException("error")); + final MetricsRecorder.MetricsRecorderTimer timerContext = mock(MetricsRecorder.MetricsRecorderTimer.class); + + // when + final Mono result = target.finalizeResult(mono, null, timerContext, "prefix"); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(metricsRecorder).markMeterForTag(eq("prefix"), eq(MetricsRecorder.MeasurementTag.ERROR_BAD_REQUEST)); + } + + @Test + public void finalizeResultShouldHandleNotFoundStatusCode() { + // given + final ServerResponse serverResponse = mock(ServerResponse.class); + given(serverResponse.statusCode()).willReturn(HttpStatus.NOT_FOUND); + given(builder.error(any(), any())).willReturn(Mono.just(serverResponse)); + + final Mono mono = Mono.error(new BadRequestException("error")); + final MetricsRecorder.MetricsRecorderTimer timerContext = mock(MetricsRecorder.MetricsRecorderTimer.class); + + // when + final Mono result = target.finalizeResult(mono, null, timerContext, "prefix"); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(metricsRecorder).markMeterForTag(eq("prefix"), eq(MetricsRecorder.MeasurementTag.ERROR_MISSINGID)); + } + + private static Consumer emptyConsumer() { + return t -> { + }; + } +} diff --git a/src/test/java/org/prebid/cache/handlers/CacheHandlerTests.java b/src/test/java/org/prebid/cache/handlers/CacheHandlerTests.java deleted file mode 100644 index dd94868..0000000 --- a/src/test/java/org/prebid/cache/handlers/CacheHandlerTests.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.prebid.cache.handlers; - -import org.prebid.cache.exceptions.RequestParsingException; -import org.prebid.cache.exceptions.RepositoryException; -import org.prebid.cache.handlers.cache.CacheHandler; -import org.prebid.cache.model.Payload; -import org.prebid.cache.model.PayloadTransfer; -import org.prebid.cache.model.PayloadWrapper; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import java.util.function.Consumer; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -abstract class CacheHandlerTests { - - static final PayloadTransfer PAYLOAD_TRANSFER = PayloadTransfer.builder() - .type("json") - .key("2be04ba5-8f9b-4a1e-8100-d573c40312f8") - .value("") - .expiry(1800L) - .prefix("prebid_") - .build(); - - static final PayloadWrapper PAYLOAD_WRAPPER = PayloadWrapper.builder() - .id("2be04ba5-8f9b-4a1e-8100-d573c40312f8") - .prefix("prebid_") - .payload(Payload.of("json", "2be04ba5-8f9b-4a1e-8100-d573c40312f8", "")) - .expiry(1800L) - .isExternalId(true) - .build(); - - void verifyRepositoryError(CacheHandler handler) { - final Consumer consumer = t -> assertTrue(t instanceof RepositoryException); - verifyResultTest(consumer, handler); - } - - void verifyJacksonError(CacheHandler handler) { - final Consumer consumer = t -> assertTrue(t instanceof RequestParsingException); - verifyResultTest(consumer, handler); - } - - private void verifyResultTest(Consumer consumer, CacheHandler handler) { - Mono error = - handler.validateErrorResult(Mono.error(new Exception("jackson error"))); - - error.doOnError(consumer) - .subscribe(consumer); - StepVerifier.create(error) - .expectSubscription() - .expectError() - .verify(); - } -} diff --git a/src/test/java/org/prebid/cache/handlers/ErrorHandlerTest.java b/src/test/java/org/prebid/cache/handlers/ErrorHandlerTest.java new file mode 100644 index 0000000..426d3d9 --- /dev/null +++ b/src/test/java/org/prebid/cache/handlers/ErrorHandlerTest.java @@ -0,0 +1,80 @@ +package org.prebid.cache.handlers; + +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.prebid.cache.builders.PrebidServerResponseBuilder; +import org.prebid.cache.metrics.MetricsRecorder; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class ErrorHandlerTest { + + @Mock(strictness = Mock.Strictness.LENIENT) + private PrebidServerResponseBuilder builder; + + @Mock(strictness = Mock.Strictness.LENIENT) + private MetricsRecorder metricsRecorder; + + private ErrorHandler target; + + @BeforeEach + public void setUp() { + target = new ErrorHandler(metricsRecorder, builder); + } + + @Test + public void createResourceNotFoundShouldReturnExpectedError() { + // when + final Mono result = ErrorHandler.createResourceNotFound("uuid"); + + // then + StepVerifier.create(result).verifyErrorMessage("Resource Not Found: uuid uuid"); + } + + @Test + public void createInvalidParametersShouldReturnExpectedError() { + // when + final Mono result = ErrorHandler.createInvalidParameters(); + + // then + StepVerifier.create(result).verifyErrorMessage("Invalid Parameter(s): uuid not found."); + } + + @Test + public void createNoElementsFoundShouldReturnExpectedError() { + // when + final Mono result = ErrorHandler.createNoElementsFound(); + + // then + StepVerifier.create(result).verifyErrorMessage("No Elements Found."); + } + + @Test + public void invalidRequestShouldReturnExpectedError() { + // given + final Counter counter = mock(Counter.class); + given(metricsRecorder.getInvalidRequestMeter()).willReturn(counter); + given(builder.error(any(), any())).willReturn(Mono.error(new Throwable("error"))); + + final ServerRequest request = mock(ServerRequest.class); + + // when + final Mono result = target.invalidRequest(request); + + // then + StepVerifier.create(result).verifyErrorMessage("error"); + verify(counter).increment(); + } +} diff --git a/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTest.java b/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTest.java new file mode 100644 index 0000000..efa1d35 --- /dev/null +++ b/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTest.java @@ -0,0 +1,316 @@ +package org.prebid.cache.handlers; + +import com.github.tomakehurst.wiremock.WireMockServer; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.prebid.cache.builders.PrebidServerResponseBuilder; +import org.prebid.cache.config.CircuitBreakerPropertyConfiguration; +import org.prebid.cache.exceptions.ResourceNotFoundException; +import org.prebid.cache.handlers.cache.CacheHandler; +import org.prebid.cache.handlers.cache.GetCacheHandler; +import org.prebid.cache.metrics.MetricsRecorder; +import org.prebid.cache.metrics.MetricsRecorderTest; +import org.prebid.cache.model.Payload; +import org.prebid.cache.model.PayloadWrapper; +import org.prebid.cache.repository.CacheConfig; +import org.prebid.cache.repository.ReactiveRepository; +import org.prebid.cache.routers.ApiConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.mock.web.reactive.function.server.MockServerRequest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.function.Consumer; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.verify; + +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = {CircuitBreakerPropertyConfiguration.class, MetricsRecorderTest.class}) +@EnableConfigurationProperties +@SpringBootTest +public class GetCacheHandlerTest { + + @MockBean + private CacheConfig config; + + @MockBean + private ApiConfig apiConfig; + + @MockBean + private CacheHandler cacheHandler; + + @MockBean + private ReactiveRepository repository; + + @Autowired + private CircuitBreaker webClientCircuitBreaker; + + @MockBean + private PrebidServerResponseBuilder builder; + + @SpyBean + private MetricsRecorder metricsRecorder; + + private GetCacheHandler target; + + private WireMockServer wireMockServer; + + @Mock + private ServerResponse response; + + @BeforeEach + public void setup() { + given(config.getPrefix()).willReturn("prefix:"); + given(config.getHostParamProtocol()).willReturn("http"); + given(config.getTimeoutMs()).willReturn(Integer.MAX_VALUE); + + given(apiConfig.getCachePath()).willReturn("cache"); + + given(cacheHandler.validateErrorResult(eq(ServiceType.FETCH), any())) + .willAnswer(invocation -> invocation.getArgument(1)); + given(cacheHandler.finalizeResult(any(), any(), any(), eq("read"))) + .willAnswer(invocation -> invocation.getArgument(0)); + + given(builder.createResponseMono(any(), any(), ArgumentMatchers.any())) + .willReturn(Mono.just(response)); + + target = new GetCacheHandler( + config, + apiConfig, + cacheHandler, + repository, + webClientCircuitBreaker, + builder, + metricsRecorder); + + wireMockServer = new WireMockServer(8080); + wireMockServer.start(); + } + + @AfterEach + public void teardown() { + wireMockServer.stop(); + } + + @Test + public void fetchShouldReturnErrorOnMissingUuid() { + // given + given(cacheHandler.finalizeResult(any(), any(), any(), anyString())) + .willReturn(Mono.error(new Throwable("error"))); + + final ServerRequest request = MockServerRequest.builder().build(); + + // when + final Mono result = target.fetch(request); + + // then + StepVerifier.create(result) + .expectErrorMessage("error") + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.FETCH), eq("read")); + } + + @Test + public void fetchShouldReturnResourceNotFound() { + // given + given(repository.findById(eq("prefix:uuid"))).willReturn(Mono.empty()); + + final ServerRequest request = MockServerRequest.builder() + .queryParam("uuid", "uuid") + .build(); + + // when + final Mono result = target.fetch(request); + + // then + StepVerifier.create(result) + .expectError(ResourceNotFoundException.class) + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.FETCH), eq("read")); + verify(cacheHandler).validateErrorResult(eq(ServiceType.FETCH), any()); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("read")); + } + + @Test + public void fetchShouldProcessJsonPayload() { + // given + final PayloadWrapper payloadWrapper = PayloadWrapper.builder() + .payload(Payload.of("json", null, null)) + .build(); + given(repository.findById(eq("prefix:uuid"))).willReturn(Mono.just(payloadWrapper)); + + final ServerRequest request = MockServerRequest.builder() + .queryParam("uuid", "uuid") + .build(); + + // when + final Mono result = target.fetch(request); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.FETCH), eq("read")); + verify(cacheHandler).validateErrorResult(eq(ServiceType.FETCH), any()); + verify(metricsRecorder).markMeterForTag(eq("read"), eq(MetricsRecorder.MeasurementTag.JSON)); + verify(builder).createResponseMono(any(), eq(MediaType.APPLICATION_JSON_UTF8), same(payloadWrapper)); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("read")); + } + + @Test + public void fetchShouldProcessXmlPayload() { + // given + final PayloadWrapper payloadWrapper = PayloadWrapper.builder() + .payload(Payload.of("xml", null, null)) + .build(); + given(repository.findById(eq("prefix:uuid"))).willReturn(Mono.just(payloadWrapper)); + + final ServerRequest request = MockServerRequest.builder() + .queryParam("uuid", "uuid") + .build(); + + // when + final Mono result = target.fetch(request); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.FETCH), eq("read")); + verify(cacheHandler).validateErrorResult(eq(ServiceType.FETCH), any()); + verify(metricsRecorder).markMeterForTag(eq("read"), eq(MetricsRecorder.MeasurementTag.XML)); + verify(builder).createResponseMono(any(), eq(MediaType.APPLICATION_XML), same(payloadWrapper)); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("read")); + } + + @Test + public void fetchShouldProcessUnsupportedPayload() { + // given + final PayloadWrapper payloadWrapper = PayloadWrapper.builder() + .payload(Payload.of("unsupported", null, null)) + .build(); + given(repository.findById(eq("prefix:uuid"))).willReturn(Mono.just(payloadWrapper)); + + final ServerRequest request = MockServerRequest.builder() + .queryParam("uuid", "uuid") + .build(); + + // when + final Mono result = target.fetch(request); + + // then + StepVerifier.create(result) + .expectErrorMessage("Unsupported Media Type.") + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.FETCH), eq("read")); + verify(cacheHandler).validateErrorResult(eq(ServiceType.FETCH), any()); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("read")); + } + + @Test + public void fetchShouldProcessProxyRequest() { + // given + given(config.getAllowedProxyHost()).willReturn("http://localhost:8080/cache"); + wireMockServer.stubFor(get(urlPathEqualTo("/cache")) + .willReturn(aResponse() + .withHeader(HttpHeaders.CONTENT_TYPE, "application/json;charset=utf-8") + .withBody("{\"uuid\":\"uuid\"}"))); + + final ServerRequest request = MockServerRequest.builder() + .queryParam("uuid", "uuid") + .queryParam("ch", "localhost:8080") + .build(); + + // when + final Mono result = target.fetch(request); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.FETCH), eq("read")); + verify(metricsRecorder).getProxySuccess(); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("read")); + } + + @Test + public void fetchShouldProcessProxyRequestWithWrongStatusCode() { + // given + given(config.getAllowedProxyHost()).willReturn("http://localhost:8080/cache"); + wireMockServer.stubFor(get(urlPathEqualTo("/cache")) + .willReturn(aResponse() + .withStatus(201) + .withHeader(HttpHeaders.CONTENT_TYPE, "application/json;charset=utf-8") + .withBody("{\"uuid\":\"uuid\"}"))); + + final ServerRequest request = MockServerRequest.builder() + .queryParam("uuid", "uuid") + .queryParam("ch", "localhost:8080") + .build(); + + // when + final Mono result = target.fetch(request); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.FETCH), eq("read")); + verify(metricsRecorder).getProxyFailure(); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("read")); + } + + @Test + public void fetchShouldProcessFailedProxyRequest() { + // given + given(config.getAllowedProxyHost()).willReturn("http://unreachable/cache"); + + final ServerRequest request = MockServerRequest.builder() + .queryParam("uuid", "uuid") + .queryParam("ch", "unreachable") + .build(); + + // when + final Mono result = target.fetch(request); + + // then + StepVerifier.create(result) + .expectError() + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.FETCH), eq("read")); + verify(metricsRecorder).getProxyFailure(); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("read")); + } + + private static Consumer emptyConsumer() { + return t -> { + }; + } +} diff --git a/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTests.java b/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTests.java deleted file mode 100644 index 62a0837..0000000 --- a/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTests.java +++ /dev/null @@ -1,217 +0,0 @@ -package org.prebid.cache.handlers; - -import com.github.tomakehurst.wiremock.WireMockServer; -import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.prebid.cache.builders.PrebidServerResponseBuilder; -import org.prebid.cache.config.CircuitBreakerPropertyConfiguration; -import org.prebid.cache.handlers.cache.GetCacheHandler; -import org.prebid.cache.metrics.MetricsRecorder; -import org.prebid.cache.metrics.MetricsRecorderTest; -import org.prebid.cache.model.PayloadWrapper; -import org.prebid.cache.repository.CacheConfig; -import org.prebid.cache.repository.ReactiveRepository; -import org.prebid.cache.routers.ApiConfig; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.MediaType; -import org.springframework.mock.web.reactive.function.server.MockServerRequest; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.web.reactive.function.server.ServerResponse; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Signal; -import reactor.test.StepVerifier; - -import java.util.function.Consumer; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; -import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase; -import static com.github.tomakehurst.wiremock.client.WireMock.get; -import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; -import static com.github.tomakehurst.wiremock.client.WireMock.verify; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.BDDMockito.given; -import static org.springframework.http.HttpHeaders.CONTENT_TYPE; - -@ExtendWith(SpringExtension.class) -@ContextConfiguration(classes = { - GetCacheHandler.class, - PrebidServerResponseBuilder.class, - CacheConfig.class, - CacheConfig.class, - MetricsRecorderTest.class, - MetricsRecorder.class, - ApiConfig.class, - CircuitBreakerPropertyConfiguration.class -}) -@EnableConfigurationProperties -@SpringBootTest -class GetCacheHandlerTests extends CacheHandlerTests { - - @Autowired - CircuitBreaker webClientCircuitBreaker; - - @Autowired - CacheConfig cacheConfig; - - @Autowired - ApiConfig apiConfig; - - @Autowired - MetricsRecorder metricsRecorder; - - @Autowired - PrebidServerResponseBuilder responseBuilder; - - @MockBean - ReactiveRepository repository; - - @Value("${sampling.rate:2.0}") - Double samplingRate; - - GetCacheHandler handler; - - WireMockServer serverMock; - - @BeforeEach - public void setup() { - handler = new GetCacheHandler( - repository, - cacheConfig, - apiConfig, - metricsRecorder, - responseBuilder, - webClientCircuitBreaker, - samplingRate); - serverMock = new WireMockServer(8080); - serverMock.start(); - } - - @AfterEach - public void teardown() { - serverMock.stop(); - } - - @Test - void testVerifyError() { - verifyJacksonError(handler); - verifyRepositoryError(handler); - } - - private static Consumer assertNotFoundStatusCode() { - return response -> assertEquals(response.statusCode().value(), 404); - } - - @Test - void testVerifyFetch() { - given(repository.findById("prebid_a8db2208-d085-444c-9721-c1161d7f09ce")).willReturn(Mono.just(PAYLOAD_WRAPPER)); - - final var requestMono = MockServerRequest.builder() - .method(HttpMethod.GET) - .queryParam("uuid", "a8db2208-d085-444c-9721-c1161d7f09ce") - .build(); - - final var responseMono = handler.fetch(requestMono); - - responseMono.doOnEach(assertSignalStatusCode(200)).subscribe(); - StepVerifier.create(responseMono) - .expectSubscription() - .expectNextMatches(t -> true) - .expectComplete() - .verify(); - } - - @Test - void testVerifyFetchWithCacheHostParam() { - serverMock.stubFor(get(urlPathEqualTo("/cache")) - .willReturn(aResponse().withHeader(HttpHeaders.CONTENT_TYPE, "application/json;charset=utf-8") - .withBody("{\"uuid\":\"2be04ba5-8f9b-4a1e-8100-d573c40312f8\"}"))); - - final var requestMono = MockServerRequest.builder() - .method(HttpMethod.GET) - .header(CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) - .queryParam("uuid", "a8db2208-d085-444c-9721-c1161d7f09ce") - .queryParam("ch", "localhost:8080") - .build(); - - final var responseMono = handler.fetch(requestMono); - responseMono.doOnEach(assertSignalStatusCode(200)).subscribe(); - - StepVerifier.create(responseMono) - .expectSubscription() - .expectNextMatches(t -> true) - .expectComplete() - .verify(); - - verify(getRequestedFor(urlPathEqualTo("/cache")) - .withQueryParam("uuid", equalTo("a8db2208-d085-444c-9721-c1161d7f09ce")) - .withHeader(HttpHeaders.CONTENT_TYPE, equalToIgnoreCase(MediaType.APPLICATION_JSON_UTF8_VALUE)) - ); - } - - @Test - void testVerifyFailForNotFoundResourceWithCacheHostParam() { - final var requestMono = MockServerRequest.builder() - .method(HttpMethod.GET) - .queryParam("uuid", "a8db2208-d085-444c-9721-c1161d7f09ce") - .queryParam("ch", "localhost:8080") - .build(); - - final var responseMono = handler.fetch(requestMono); - - responseMono.doOnEach(assertSignalStatusCode(404)).subscribe(); - StepVerifier.create(responseMono) - .consumeNextWith(assertNotFoundStatusCode()) - .expectComplete() - .verify(); - } - - @Test - void testVerifyFetchReturnsBadRequestWhenResponseStatusIsNotOk() { - - serverMock.stubFor(get(urlPathEqualTo("/cache")) - .willReturn(aResponse().withHeader(HttpHeaders.CONTENT_TYPE, "application/json;charset=utf-8") - .withStatus(201) - .withBody("{\"uuid\":\"2be04ba5-8f9b-4a1e-8100-d573c40312f8\"}"))); - - final var requestMono = MockServerRequest.builder() - .method(HttpMethod.GET) - .header(CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) - .queryParam("uuid", "a8db2208-d085-444c-9721-c1161d7f09ce") - .queryParam("ch", "localhost:8080") - .build(); - - final var responseMono = handler.fetch(requestMono); - - responseMono.doOnEach(assertSignalStatusCode(400)).subscribe(); - StepVerifier.create(responseMono) - .expectSubscription() - .expectNextMatches(t -> true) - .expectComplete() - .verify(); - - verify(getRequestedFor(urlPathEqualTo("/cache")) - .withQueryParam("uuid", equalTo("a8db2208-d085-444c-9721-c1161d7f09ce")) - .withHeader(HttpHeaders.CONTENT_TYPE, equalToIgnoreCase(MediaType.APPLICATION_JSON_UTF8_VALUE)) - ); - } - - private static Consumer> assertSignalStatusCode(int statusCode) { - return signal -> { - assertTrue(signal.isOnComplete()); - assertEquals(signal.get().statusCode().value(), statusCode); - }; - } -} diff --git a/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTest.java b/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTest.java new file mode 100644 index 0000000..5c22e5b --- /dev/null +++ b/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTest.java @@ -0,0 +1,241 @@ +package org.prebid.cache.handlers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.tomakehurst.wiremock.WireMockServer; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.prebid.cache.builders.PrebidServerResponseBuilder; +import org.prebid.cache.config.CircuitBreakerPropertyConfiguration; +import org.prebid.cache.config.ObjectMapperConfig; +import org.prebid.cache.handlers.cache.CacheHandler; +import org.prebid.cache.handlers.cache.PostCacheHandler; +import org.prebid.cache.metrics.MetricsRecorder; +import org.prebid.cache.metrics.MetricsRecorderTest; +import org.prebid.cache.model.PayloadTransfer; +import org.prebid.cache.model.PayloadWrapper; +import org.prebid.cache.model.RequestObject; +import org.prebid.cache.model.ResponseObject; +import org.prebid.cache.repository.CacheConfig; +import org.prebid.cache.repository.ReactiveRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.mock.web.reactive.function.server.MockServerRequest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.function.Consumer; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static java.util.Collections.singletonList; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.verify; +import static org.prebid.cache.util.AwaitilityUtil.awaitAndVerify; + +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = { + CircuitBreakerPropertyConfiguration.class, + ObjectMapperConfig.class, + MetricsRecorderTest.class +}) +@EnableConfigurationProperties +@SpringBootTest +public class PostCacheHandlerTest { + + @MockBean + private CacheConfig config; + + @MockBean + private CacheHandler cacheHandler; + + @MockBean + private ReactiveRepository repository; + + @Autowired + private CircuitBreaker webClientCircuitBreaker; + + @MockBean + private PrebidServerResponseBuilder builder; + + @SpyBean + private MetricsRecorder metricsRecorder; + + @SpyBean + private ObjectMapper mapper; + + private PostCacheHandler target; + + private WireMockServer wireMockServer; + + @Mock + private ServerResponse response; + + @BeforeEach + public void setup() { + given(config.getHostParamProtocol()).willReturn("http"); + given(config.getSecondaryUris()).willReturn(singletonList("localhost:8080")); + given(config.getSecondaryCachePath()).willReturn("/cache"); + + given(cacheHandler.validateErrorResult(eq(ServiceType.SAVE), any())) + .willAnswer(invocation -> invocation.getArgument(1)); + given(cacheHandler.finalizeResult(any(), any(), any(), eq("write"))) + .willAnswer(invocation -> invocation.getArgument(0)); + + given(repository.save(any())) + .willAnswer(invocation -> Mono.just(invocation.getArgument(0))); + + given(builder.createResponseMono(any(), any(), ArgumentMatchers.any())) + .willReturn(Mono.just(response)); + + target = new PostCacheHandler( + config, + cacheHandler, + repository, + webClientCircuitBreaker, + builder, + metricsRecorder, + mapper); + + wireMockServer = new WireMockServer(8080); + wireMockServer.start(); + } + + @AfterEach + public void teardown() { + wireMockServer.stop(); + } + + @Test + public void saveShouldUseObjectMapperForPlainText() throws JsonProcessingException { + // given + final ServerRequest request = MockServerRequest.builder() + .header("Content-Type", "text/plain") + .queryParam("secondaryCache", "yes") + .body(Mono.just("null")); + + // when + final Mono result = target.save(request); + + // then + StepVerifier.create(result) + .expectErrorMessage("No Elements Found.") + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.SAVE), eq("write")); + verify(mapper).readValue(anyString(), eq(RequestObject.class)); + verify(cacheHandler).validateErrorResult(eq(ServiceType.SAVE), any()); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("write")); + } + + @Test + public void saveShouldReturnErrorWhenExternalUuidsDisabled() { + // given + final ServerRequest request = MockServerRequest.builder() + .queryParam("secondaryCache", "yes") + .body(Mono.just(RequestObject.of(singletonList(PayloadTransfer.builder().key("uuid").build())))); + + // when + final Mono result = target.save(request); + + // then + StepVerifier.create(result) + .expectErrorMessage("Prebid cache host forbids specifying UUID in request.") + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.SAVE), eq("write")); + verify(cacheHandler).validateErrorResult(eq(ServiceType.SAVE), any()); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("write")); + } + + @Test + public void saveShouldReturnErrorOnInvalidUuid() { + // given + given(config.isAllowExternalUUID()).willReturn(true); + + final ServerRequest request = MockServerRequest.builder() + .queryParam("secondaryCache", "yes") + .body(Mono.just(RequestObject.of(singletonList(PayloadTransfer.builder().key("u/uid").build())))); + + // when + final Mono result = target.save(request); + + // then + StepVerifier.create(result) + .expectErrorMessage("Invalid UUID: [u/uid].") + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.SAVE), eq("write")); + verify(cacheHandler).validateErrorResult(eq(ServiceType.SAVE), any()); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("write")); + } + + @Test + public void saveShouldReturnExpectedResult() { + // given + final ServerRequest request = MockServerRequest.builder() + .queryParam("secondaryCache", "yes") + .body(Mono.just(RequestObject.of(singletonList(PayloadTransfer.builder().build())))); + + // when + final Mono result = target.save(request); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.SAVE), eq("write")); + verify(repository).save(any()); + verify(cacheHandler).validateErrorResult(eq(ServiceType.SAVE), any()); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("write")); + } + + @Test + public void saveShouldSendRequestToSecondaryCacheHosts() { + // given + wireMockServer.stubFor(post(urlPathEqualTo("/cache")) + .withQueryParam("secondaryCache", equalTo("yes")) + .willReturn(aResponse().withStatus(200))); + + final ServerRequest request = MockServerRequest.builder() + .body(Mono.just(RequestObject.of(singletonList(PayloadTransfer.builder().build())))); + + // when + final Mono result = target.save(request); + + // then + StepVerifier.create(result) + .assertNext(emptyConsumer()) + .expectComplete() + .verify(); + verify(cacheHandler).timerContext(eq(ServiceType.SAVE), eq("write")); + verify(repository).save(any()); + verify(cacheHandler).validateErrorResult(eq(ServiceType.SAVE), any()); + verify(cacheHandler).finalizeResult(any(), any(), any(), eq("write")); + + awaitAndVerify( + postRequestedFor(urlPathEqualTo("/cache")).withQueryParam("secondaryCache", equalTo("yes")), + 5000); + } + + private static Consumer emptyConsumer() { + return t -> { + }; + } +} diff --git a/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java b/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java deleted file mode 100644 index d251c11..0000000 --- a/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java +++ /dev/null @@ -1,239 +0,0 @@ -package org.prebid.cache.handlers; - -import com.github.tomakehurst.wiremock.WireMockServer; -import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder; -import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.prebid.cache.builders.PrebidServerResponseBuilder; -import org.prebid.cache.config.CircuitBreakerPropertyConfiguration; -import org.prebid.cache.exceptions.DuplicateKeyException; -import org.prebid.cache.handlers.cache.PostCacheHandler; -import org.prebid.cache.metrics.MetricsRecorder; -import org.prebid.cache.metrics.MetricsRecorderTest; -import org.prebid.cache.model.PayloadWrapper; -import org.prebid.cache.model.RequestObject; -import org.prebid.cache.repository.CacheConfig; -import org.prebid.cache.repository.ReactiveRepository; -import org.prebid.cache.routers.ApiConfig; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.MediaType; -import org.springframework.mock.web.reactive.function.server.MockServerRequest; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.web.reactive.function.server.ServerResponse; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import java.util.Collections; -import java.util.Date; -import java.util.function.Consumer; -import java.util.function.Supplier; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; -import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.BDDMockito.given; -import static org.prebid.cache.util.AwaitilityUtil.awaitAndVerify; -import static org.springframework.http.HttpHeaders.CONTENT_TYPE; - -@ExtendWith(SpringExtension.class) -@ContextConfiguration(classes = { - PostCacheHandler.class, - PrebidServerResponseBuilder.class, - CacheConfig.class, - MetricsRecorderTest.class, - MetricsRecorder.class, - ApiConfig.class, - CircuitBreakerPropertyConfiguration.class -}) -@EnableConfigurationProperties -@SpringBootTest -class PostCacheHandlerTests extends CacheHandlerTests { - - @Autowired - MetricsRecorder metricsRecorder; - - @Autowired - PrebidServerResponseBuilder builder; - - @Autowired - CacheConfig cacheConfig; - - @Autowired - CircuitBreaker webClientCircuitBreaker; - - @MockBean - Supplier currentDateProvider; - - @MockBean - ReactiveRepository repository; - - @Value("${sampling.rate:2.0}") - Double samplingRate; - - @Test - void testVerifyError() { - PostCacheHandler handler = new PostCacheHandler( - repository, - cacheConfig, - metricsRecorder, - builder, - webClientCircuitBreaker, - samplingRate); - verifyJacksonError(handler); - verifyRepositoryError(handler); - } - - WireMockServer serverMock; - - @BeforeEach - public void setup() { - serverMock = new WireMockServer(8080); - serverMock.start(); - } - - @AfterEach - public void teardown() { - serverMock.stop(); - } - - @Test - void testVerifySave() { - given(currentDateProvider.get()).willReturn(new Date(100)); - given(repository.save(PAYLOAD_WRAPPER)).willReturn(Mono.just(PAYLOAD_WRAPPER)); - - final PostCacheHandler handler = new PostCacheHandler(repository, cacheConfig, metricsRecorder, builder, - webClientCircuitBreaker, samplingRate); - - final Mono request = Mono.just(RequestObject.of(Collections.singletonList(PAYLOAD_TRANSFER))); - final MockServerRequest requestMono = MockServerRequest.builder() - .method(HttpMethod.POST) - .header(CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) - .body(request); - - final Mono responseMono = handler.save(requestMono); - - final Consumer consumer = - serverResponse -> assertEquals(200, serverResponse.statusCode().value()); - - StepVerifier.create(responseMono) - .consumeNextWith(consumer) - .expectComplete() - .verify(); - } - - @Test - void testSecondaryCacheSuccess() { - given(currentDateProvider.get()).willReturn(new Date(100)); - given(repository.save(PAYLOAD_WRAPPER)).willReturn(Mono.just(PAYLOAD_WRAPPER)); - - serverMock.stubFor(post(urlPathEqualTo("/cache")) - .willReturn(aResponse().withBody("{\"responses\":[{\"uuid\":\"2be04ba5-8f9b-4a1e-8100-d573c40312f8\"}]}"))); - - final PostCacheHandler handler = new PostCacheHandler(repository, cacheConfig, metricsRecorder, builder, - webClientCircuitBreaker, samplingRate); - - final Mono request = Mono.just(RequestObject.of(Collections.singletonList(PAYLOAD_TRANSFER))); - final MockServerRequest requestMono = MockServerRequest.builder() - .method(HttpMethod.POST) - .header(CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) - .body(request); - - final Mono responseMono = handler.save(requestMono); - - final Consumer consumer = - serverResponse -> assertEquals(200, serverResponse.statusCode().value()); - - StepVerifier.create(responseMono) - .consumeNextWith(consumer) - .expectComplete() - .verify(); - - final RequestPatternBuilder requestPatternBuilder = postRequestedFor(urlPathEqualTo("/cache")) - .withQueryParam("secondaryCache", equalTo("yes")) - .withHeader(HttpHeaders.CONTENT_TYPE, equalToIgnoreCase("application/json")); - - awaitAndVerify(requestPatternBuilder, 5000); - } - - @Test - void testExternalUUIDInvalid() { - //given - final var cacheConfigLocal = new CacheConfig(cacheConfig.getPrefix(), cacheConfig.getExpirySec(), - cacheConfig.getTimeoutMs(), - cacheConfig.getMinExpiry(), cacheConfig.getMaxExpiry(), - false, Collections.emptyList(), cacheConfig.getSecondaryCachePath(), 100, 100, "example.com", "http"); - final var handler = new PostCacheHandler(repository, cacheConfigLocal, metricsRecorder, builder, - webClientCircuitBreaker, samplingRate); - - final Mono request = Mono.just(RequestObject.of(Collections.singletonList(PAYLOAD_TRANSFER))); - final MockServerRequest requestMono = MockServerRequest.builder() - .method(HttpMethod.POST) - .header(CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) - .body(request); - - final Mono responseMono = handler.save(requestMono); - - final Consumer consumer = - serverResponse -> assertEquals(400, serverResponse.statusCode().value()); - - StepVerifier.create(responseMono) - .consumeNextWith(consumer) - .expectComplete() - .verify(); - } - - @Test - void testUUIDDuplication() { - given(currentDateProvider.get()).willReturn(new Date(100)); - given(repository.save(PAYLOAD_WRAPPER)) - .willReturn(Mono.just(PAYLOAD_WRAPPER)) - .willReturn(Mono.error(new DuplicateKeyException(""))); - - final CacheConfig cacheConfigLocal = new CacheConfig(cacheConfig.getPrefix(), cacheConfig.getExpirySec(), - cacheConfig.getTimeoutMs(), - 5, cacheConfig.getMaxExpiry(), cacheConfig.isAllowExternalUUID(), - Collections.emptyList(), cacheConfig.getSecondaryCachePath(), 100, 100, "example.com", "http"); - final PostCacheHandler handler = new PostCacheHandler(repository, cacheConfigLocal, metricsRecorder, builder, - webClientCircuitBreaker, samplingRate); - - final Mono request = Mono.just(RequestObject.of(Collections.singletonList(PAYLOAD_TRANSFER))); - final MockServerRequest requestMono = MockServerRequest.builder() - .method(HttpMethod.POST) - .header(CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) - .body(request); - - final Mono responseMono = handler.save(requestMono); - - final Consumer consumer = - serverResponse -> assertEquals(200, serverResponse.statusCode().value()); - - StepVerifier.create(responseMono) - .consumeNextWith(consumer) - .expectComplete() - .verify(); - - final Mono responseMonoSecond = handler.save(requestMono); - - final Consumer consumerSecond = serverResponse -> - assertEquals(400, serverResponse.statusCode().value()); - - StepVerifier.create(responseMonoSecond) - .consumeNextWith(consumerSecond) - .expectComplete() - .verify(); - } -}