From 1171f236fd1dc6f4bc635b4368bceec96be8d753 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 8 Jan 2025 18:32:39 +0100 Subject: [PATCH 1/4] Test converter using embedded connect --- large-message-connect/build.gradle.kts | 13 ++- .../LargeMessageConverterIntegrationTest.java | 89 ++++++++++--------- 2 files changed, 56 insertions(+), 46 deletions(-) diff --git a/large-message-connect/build.gradle.kts b/large-message-connect/build.gradle.kts index dc31820..41f3153 100644 --- a/large-message-connect/build.gradle.kts +++ b/large-message-connect/build.gradle.kts @@ -42,9 +42,16 @@ dependencies { val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) - testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.6.0") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - } testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion) testImplementation(testFixtures(project(":large-message-core"))) + testImplementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion) + testImplementation( + group = "org.apache.kafka", + name = "connect-runtime", + version = kafkaVersion, + classifier = "test" + ) + testImplementation(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion, classifier = "test") + testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion) + testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion, classifier = "test") } diff --git a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java index f97cd33..1ddcc66 100644 --- a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java +++ b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,28 +24,26 @@ package com.bakdata.kafka; -import static net.mguenther.kafka.junit.Wait.delay; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; import java.util.concurrent.TimeUnit; -import net.mguenther.kafka.junit.EmbeddedConnectConfig; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.SendKeyValues; -import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.file.FileStreamSinkConnector; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,15 +56,23 @@ class LargeMessageConverterIntegrationTest extends AmazonS3IntegrationTest { private static final String TOPIC = "input"; private static final String EXTRACT_RECORD_KEY = "key1"; private static final String DOWNLOAD_RECORD_KEY = "key2"; - private EmbeddedKafkaCluster kafkaCluster; + private EmbeddedConnectCluster kafkaCluster; private Path outputFile; + private static String asValueConfig(final String key) { + return ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key; + } + @BeforeEach void setUp() throws IOException { this.outputFile = Files.createTempFile("test", "temp"); final S3Client s3 = this.getS3Client(); s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build()); - this.kafkaCluster = this.createCluster(); + this.kafkaCluster = new EmbeddedConnectCluster.Builder() + .name("test-cluster") + .workerProps(new HashMap<>(Map.of("plugin.discovery", + "hybrid_warn"))) // map needs to be mutable // FIXME make compatible with service discovery + .build(); this.kafkaCluster.start(); } @@ -78,63 +84,60 @@ void tearDown() throws IOException { @Test void shouldProcessRecordsCorrectly() throws InterruptedException, IOException { - this.kafkaCluster - .send(SendKeyValues.to(TOPIC, Collections.singletonList(new KeyValue<>(DOWNLOAD_RECORD_KEY, "toS3"))) - .withAll(this.createProducerProperties(true)).build()); - - this.kafkaCluster.send(SendKeyValues - .to(TOPIC, Collections.singletonList(new KeyValue<>(EXTRACT_RECORD_KEY, "local"))) - .withAll(this.createProducerProperties(false)).build()); + this.kafkaCluster.kafka().createTopic(TOPIC); + this.kafkaCluster.configureConnector("test", this.config()); + try (final Producer producer = this.kafkaCluster.kafka() + .createProducer(Collections.emptyMap())) { + producer.send(this.createRecord(DOWNLOAD_RECORD_KEY, "toS3", true)); + producer.send(this.createRecord(EXTRACT_RECORD_KEY, "local", false)); + } // makes sure that both records are processed - delay(2, TimeUnit.SECONDS); + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); final List output = Files.readAllLines(this.outputFile); assertThat(output).containsExactly("toS3", "local"); } - private EmbeddedKafkaCluster createCluster() { - return EmbeddedKafkaCluster.provisionWith(EmbeddedKafkaClusterConfig - .newClusterConfig() - .configure( - EmbeddedConnectConfig - .kafkaConnect() - .deployConnector(this.config()) - .build()) - .build()); + private ProducerRecord createRecord(final String key, final String value, + final boolean shouldBack) { + try (final Serializer keySerializer = new StringSerializer(); + final Serializer valueSerializer = this.createSerializer(shouldBack)) { + final byte[] keyBytes = keySerializer.serialize(TOPIC, key); + final byte[] valueBytes = valueSerializer.serialize(TOPIC, value); + return new ProducerRecord<>(TOPIC, keyBytes, valueBytes); + } } - private Properties createS3BackedProperties() { - final Properties properties = new Properties(); - properties.putAll(this.getLargeMessageConfig()); + private Map createS3BackedProperties() { + final Map properties = new HashMap<>(this.getLargeMessageConfig()); properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName()); properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName()); properties.put( AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/%s", BUCKET_NAME, S3_KEY_NAME)); - properties.setProperty(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); return properties; } - private Properties config() { - final Properties properties = new Properties(); - properties.put(ConnectorConfig.NAME_CONFIG, "test"); - properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink"); + private Map config() { + final Map properties = new HashMap<>(); + properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName()); properties.put(SinkConnector.TOPICS_CONFIG, TOPIC); properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString()); properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, LargeMessageConverter.class.getName()); + properties.put(asValueConfig(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG), + StringConverter.class.getName()); this.createS3BackedProperties().forEach( - (key, value) -> properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key, value)); + (key, value) -> properties.put(asValueConfig(key), value)); return properties; } - private Properties createProducerProperties(final boolean shouldBack) { - final Properties properties = new Properties(); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LargeMessageSerializer.class); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList()); + private Serializer createSerializer(final boolean shouldBack) { + final Map properties = new HashMap<>(); properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, Integer.toString(shouldBack ? 0 : Integer.MAX_VALUE)); properties.putAll(this.createS3BackedProperties()); - return properties; + final Serializer serializer = new LargeMessageSerializer<>(); + serializer.configure(properties, false); + return serializer; } } From 196846f27c24d8dee44810a4e3958de977b5ae63 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 8 Jan 2025 18:37:41 +0100 Subject: [PATCH 2/4] Test converter using embedded connect --- .../LargeMessageConverterIntegrationTest.java | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java index 1ddcc66..211a34e 100644 --- a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java +++ b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java @@ -29,15 +29,14 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.file.FileStreamSinkConnector; import org.apache.kafka.connect.runtime.ConnectorConfig; @@ -86,10 +85,11 @@ void tearDown() throws IOException { void shouldProcessRecordsCorrectly() throws InterruptedException, IOException { this.kafkaCluster.kafka().createTopic(TOPIC); this.kafkaCluster.configureConnector("test", this.config()); - try (final Producer producer = this.kafkaCluster.kafka() - .createProducer(Collections.emptyMap())) { - producer.send(this.createRecord(DOWNLOAD_RECORD_KEY, "toS3", true)); - producer.send(this.createRecord(EXTRACT_RECORD_KEY, "local", false)); + try (final Producer producer = this.createProducer(this.createProducerProperties(true))) { + producer.send(new ProducerRecord<>(TOPIC, DOWNLOAD_RECORD_KEY, "toS3")); + } + try (final Producer producer = this.createProducer(this.createProducerProperties(false))) { + producer.send(new ProducerRecord<>(TOPIC, EXTRACT_RECORD_KEY, "local")); } // makes sure that both records are processed @@ -98,14 +98,10 @@ void shouldProcessRecordsCorrectly() throws InterruptedException, IOException { assertThat(output).containsExactly("toS3", "local"); } - private ProducerRecord createRecord(final String key, final String value, - final boolean shouldBack) { - try (final Serializer keySerializer = new StringSerializer(); - final Serializer valueSerializer = this.createSerializer(shouldBack)) { - final byte[] keyBytes = keySerializer.serialize(TOPIC, key); - final byte[] valueBytes = valueSerializer.serialize(TOPIC, value); - return new ProducerRecord<>(TOPIC, keyBytes, valueBytes); - } + @SuppressWarnings("unchecked") // Producer always uses byte[] although serializer is customizable + private Producer createProducer(final Map properties) { + return (Producer) this.kafkaCluster.kafka() + .createProducer(properties); } private Map createS3BackedProperties() { @@ -131,13 +127,13 @@ private Map config() { return properties; } - private Serializer createSerializer(final boolean shouldBack) { + private Map createProducerProperties(final boolean shouldBack) { final Map properties = new HashMap<>(); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LargeMessageSerializer.class); properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, Integer.toString(shouldBack ? 0 : Integer.MAX_VALUE)); properties.putAll(this.createS3BackedProperties()); - final Serializer serializer = new LargeMessageSerializer<>(); - serializer.configure(properties, false); - return serializer; + return properties; } } From 13e66b1cd42b010f259b18f888db3238899e4aa1 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 8 Jan 2025 18:55:14 +0100 Subject: [PATCH 3/4] Test converter using embedded connect --- .../bakdata/kafka/LargeMessageConverterIntegrationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java index 211a34e..b4a15a2 100644 --- a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java +++ b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.file.FileStreamSinkConnector; import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; @@ -69,7 +70,7 @@ void setUp() throws IOException { s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build()); this.kafkaCluster = new EmbeddedConnectCluster.Builder() .name("test-cluster") - .workerProps(new HashMap<>(Map.of("plugin.discovery", + .workerProps(new HashMap<>(Map.of(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, "hybrid_warn"))) // map needs to be mutable // FIXME make compatible with service discovery .build(); this.kafkaCluster.start(); From 520ddc263235272cfcfd71210f942e56af94855b Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 8 Jan 2025 18:57:05 +0100 Subject: [PATCH 4/4] Test converter using embedded connect --- .../kafka/LargeMessageConverterIntegrationTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java index b4a15a2..b95afaf 100644 --- a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java +++ b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; @@ -70,8 +71,10 @@ void setUp() throws IOException { s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build()); this.kafkaCluster = new EmbeddedConnectCluster.Builder() .name("test-cluster") - .workerProps(new HashMap<>(Map.of(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, - "hybrid_warn"))) // map needs to be mutable // FIXME make compatible with service discovery + .workerProps(new HashMap<>(Map.of( // map needs to be mutable + // FIXME make compatible with service discovery + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, HYBRID_WARN.toString() + ))) .build(); this.kafkaCluster.start(); }