diff --git a/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3LargeMessageClientRoundtripTest.java b/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3LargeMessageClientRoundtripTest.java index 005f6d2..5176544 100644 --- a/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3LargeMessageClientRoundtripTest.java +++ b/large-message-core/src/test/java/com/bakdata/kafka/AmazonS3LargeMessageClientRoundtripTest.java @@ -27,20 +27,32 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.ImmutableMap; +import java.util.Collection; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Stream; import lombok.Builder; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.Value; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.core.internal.http.loader.DefaultSdkHttpClientBuilder; +import software.amazon.awssdk.http.ExecutableHttpRequest; +import software.amazon.awssdk.http.HttpExecuteRequest; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.utils.AttributeMap; class AmazonS3LargeMessageClientRoundtripTest extends AmazonS3IntegrationTest { @@ -94,6 +106,42 @@ void shouldRoundtrip(final RoundtripArgument argument) { } } + @Test + void shouldUseConfiguredSdkHttpClientBuilder() { + final String bucket = "bucket"; + final String basePath = "s3://" + bucket + "/base/"; + final Map properties = ImmutableMap.builder() + .put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, 0) + .put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath) + .put(AbstractLargeMessageConfig.S3_SDK_HTTP_CLIENT_BUILDER_CONFIG, RecordingHttpClientBuilder.class) + .build(); + final S3Client s3 = this.getS3Client(); + s3.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); + final Map fullProperties = this.createStorerProperties(properties); + final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(fullProperties); + try (final LargeMessageStoringClient storer = config.getStorer(); + final LargeMessageRetrievingClient retriever = config.getRetriever()) { + + final Headers headers = new RecordHeaders(); + final byte[] obj = serialize("foo"); + final boolean isKey = false; + final byte[] data = storer.storeBytes(TOPIC, obj, isKey, headers); + + final byte[] result = retriever.retrieveBytes(data, headers, isKey); + assertThat(result).isEqualTo(obj); + assertThat(RecordingHttpClient.REQUESTS) + .hasSize(2) + .anySatisfy(request -> { + assertThat(request.method()).isEqualTo(SdkHttpMethod.PUT); + assertThat(request.encodedPath()).startsWith("/" + bucket + "/base/" + TOPIC + "/values/"); + }) + .anySatisfy(request -> { + assertThat(request.method()).isEqualTo(SdkHttpMethod.GET); + assertThat(request.encodedPath()).startsWith("/" + bucket + "/base/" + TOPIC + "/values/"); + }); + } + } + private Map createStorerProperties(final Map properties) { return ImmutableMap.builder() .putAll(properties) @@ -120,4 +168,36 @@ static class RoundtripArgument { boolean isPathStyleAccess; String compressionType; } + + @RequiredArgsConstructor + public static class RecordingHttpClientBuilder> + implements SdkHttpClient.Builder { + + @Override + public SdkHttpClient buildWithDefaults(final AttributeMap attributeMap) { + return new RecordingHttpClient(new DefaultSdkHttpClientBuilder().buildWithDefaults(attributeMap)); + } + } + + @RequiredArgsConstructor + private static class RecordingHttpClient implements SdkHttpClient { + private static final Collection REQUESTS = new ConcurrentLinkedQueue<>(); + private final @NonNull SdkHttpClient wrapped; + + @Override + public ExecutableHttpRequest prepareRequest(final HttpExecuteRequest httpExecuteRequest) { + REQUESTS.add(httpExecuteRequest.httpRequest()); + return this.wrapped.prepareRequest(httpExecuteRequest); + } + + @Override + public String clientName() { + return "MockSdkHttpClient"; + } + + @Override + public void close() { + this.wrapped.close(); + } + } } diff --git a/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageRetrievingClientS3IntegrationTest.java b/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageRetrievingClientS3IntegrationTest.java index 9ce77c1..2bedc0f 100644 --- a/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageRetrievingClientS3IntegrationTest.java +++ b/large-message-core/src/test/java/com/bakdata/kafka/LargeMessageRetrievingClientS3IntegrationTest.java @@ -26,26 +26,16 @@ import static com.bakdata.kafka.LargeMessageRetrievingClientTest.serializeUri; import static org.assertj.core.api.Assertions.assertThat; -import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SYNC_HTTP_CLIENT_BUILDER; -import static software.amazon.awssdk.core.client.config.SdkClientOption.SYNC_HTTP_CLIENT; -import com.google.common.collect.ImmutableMap; -import java.lang.reflect.Field; import java.util.Map; -import java.util.function.Supplier; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.core.client.config.SdkClientConfiguration; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.http.ExecutableHttpRequest; -import software.amazon.awssdk.http.HttpExecuteRequest; -import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.utils.AttributeMap; class LargeMessageRetrievingClientS3IntegrationTest extends AmazonS3IntegrationTest { @@ -69,31 +59,6 @@ void shouldReadBackedText() { } } - @Test - void shouldUseConfiguredSdkHttpClientBuilder() { - final String bucket = "bucket"; - final String basePath = "s3://" + bucket + "/base/"; - final Map properties = ImmutableMap.builder() - .put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1") - .put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, 0) - .put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath) - .put(AbstractLargeMessageConfig.S3_SDK_HTTP_CLIENT_BUILDER_CONFIG, MockSdkHttpClientBuilder.class.getName()) - .build(); - AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties); - LargeMessageRetrievingClient retriever = config.getRetriever(); - // Get private field clientFactories - Map> clientFactories = getPrivateField(retriever, "clientFactories", Map.class); - BlobStorageClient blobStorageClient = clientFactories.get("s3").get(); - // Get private field s3Client - S3Client s3Client = getPrivateField(blobStorageClient, "s3", S3Client.class); - // Get private field clientConfiguration - SdkClientConfiguration clientConfiguration = getPrivateField(s3Client, "clientConfiguration", SdkClientConfiguration.class); - // Get private field attributes - AttributeMap attributes = getPrivateField(clientConfiguration, "attributes", AttributeMap.class); - assertThat(attributes.get(SYNC_HTTP_CLIENT)).isExactlyInstanceOf(MockSdkHttpClient.class); - assertThat(attributes.get(CONFIGURED_SYNC_HTTP_CLIENT_BUILDER)).isExactlyInstanceOf(MockSdkHttpClientBuilder.class); - } - private LargeMessageRetrievingClient createRetriever() { final Map properties = this.getLargeMessageConfig(); final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties); @@ -106,37 +71,4 @@ private void store(final String bucket, final String key, final String s) { .key(key) .build(), RequestBody.fromString(s)); } - - private static T getPrivateField(Object object, String fieldName, Class fieldType) { - try { - Field field = object.getClass().getDeclaredField(fieldName); - field.setAccessible(true); - return fieldType.cast(field.get(object)); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - public static class MockSdkHttpClientBuilder implements SdkHttpClient.Builder { - @Override - public SdkHttpClient buildWithDefaults(AttributeMap attributeMap) { - return new MockSdkHttpClient(); - } - } - - private static class MockSdkHttpClient implements SdkHttpClient { - @Override - public ExecutableHttpRequest prepareRequest(HttpExecuteRequest httpExecuteRequest) { - return null; - } - - public String clientName() { - return "MockSdkHttpClient"; - } - - @Override - public void close() { - - } - } }