From b80bafc3bf2f6ab92251a76c49b31d7b557d8d87 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 8 Jan 2025 19:28:28 +0100 Subject: [PATCH] Test converter using embedded connect (#53) --- large-message-connect/build.gradle.kts | 13 ++- .../LargeMessageConverterIntegrationTest.java | 83 ++++++++++--------- 2 files changed, 53 insertions(+), 43 deletions(-) diff --git a/large-message-connect/build.gradle.kts b/large-message-connect/build.gradle.kts index dc31820..41f3153 100644 --- a/large-message-connect/build.gradle.kts +++ b/large-message-connect/build.gradle.kts @@ -42,9 +42,16 @@ dependencies { val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) - testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.6.0") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - } testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion) testImplementation(testFixtures(project(":large-message-core"))) + testImplementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion) + testImplementation( + group = "org.apache.kafka", + name = "connect-runtime", + version = kafkaVersion, + classifier = "test" + ) + testImplementation(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion, classifier = "test") + testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion) + testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion, classifier = "test") } diff --git a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java index f97cd33..b95afaf 100644 --- a/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java +++ b/large-message-connect/src/test/java/com/bakdata/kafka/LargeMessageConverterIntegrationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,28 +24,27 @@ package com.bakdata.kafka; -import static net.mguenther.kafka.junit.Wait.delay; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; import java.util.concurrent.TimeUnit; -import net.mguenther.kafka.junit.EmbeddedConnectConfig; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.SendKeyValues; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.file.FileStreamSinkConnector; import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,15 +57,25 @@ class LargeMessageConverterIntegrationTest extends AmazonS3IntegrationTest { private static final String TOPIC = "input"; private static final String EXTRACT_RECORD_KEY = "key1"; private static final String DOWNLOAD_RECORD_KEY = "key2"; - private EmbeddedKafkaCluster kafkaCluster; + private EmbeddedConnectCluster kafkaCluster; private Path outputFile; + private static String asValueConfig(final String key) { + return ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key; + } + @BeforeEach void setUp() throws IOException { this.outputFile = Files.createTempFile("test", "temp"); final S3Client s3 = this.getS3Client(); s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build()); - this.kafkaCluster = this.createCluster(); + this.kafkaCluster = new EmbeddedConnectCluster.Builder() + .name("test-cluster") + .workerProps(new HashMap<>(Map.of( // map needs to be mutable + // FIXME make compatible with service discovery + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, HYBRID_WARN.toString() + ))) + .build(); this.kafkaCluster.start(); } @@ -78,60 +87,54 @@ void tearDown() throws IOException { @Test void shouldProcessRecordsCorrectly() throws InterruptedException, IOException { - this.kafkaCluster - .send(SendKeyValues.to(TOPIC, Collections.singletonList(new KeyValue<>(DOWNLOAD_RECORD_KEY, "toS3"))) - .withAll(this.createProducerProperties(true)).build()); - - this.kafkaCluster.send(SendKeyValues - .to(TOPIC, Collections.singletonList(new KeyValue<>(EXTRACT_RECORD_KEY, "local"))) - .withAll(this.createProducerProperties(false)).build()); + this.kafkaCluster.kafka().createTopic(TOPIC); + this.kafkaCluster.configureConnector("test", this.config()); + try (final Producer producer = this.createProducer(this.createProducerProperties(true))) { + producer.send(new ProducerRecord<>(TOPIC, DOWNLOAD_RECORD_KEY, "toS3")); + } + try (final Producer producer = this.createProducer(this.createProducerProperties(false))) { + producer.send(new ProducerRecord<>(TOPIC, EXTRACT_RECORD_KEY, "local")); + } // makes sure that both records are processed - delay(2, TimeUnit.SECONDS); + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); final List output = Files.readAllLines(this.outputFile); assertThat(output).containsExactly("toS3", "local"); } - private EmbeddedKafkaCluster createCluster() { - return EmbeddedKafkaCluster.provisionWith(EmbeddedKafkaClusterConfig - .newClusterConfig() - .configure( - EmbeddedConnectConfig - .kafkaConnect() - .deployConnector(this.config()) - .build()) - .build()); + @SuppressWarnings("unchecked") // Producer always uses byte[] although serializer is customizable + private Producer createProducer(final Map properties) { + return (Producer) this.kafkaCluster.kafka() + .createProducer(properties); } - private Properties createS3BackedProperties() { - final Properties properties = new Properties(); - properties.putAll(this.getLargeMessageConfig()); + private Map createS3BackedProperties() { + final Map properties = new HashMap<>(this.getLargeMessageConfig()); properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName()); properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName()); properties.put( AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/%s", BUCKET_NAME, S3_KEY_NAME)); - properties.setProperty(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); return properties; } - private Properties config() { - final Properties properties = new Properties(); - properties.put(ConnectorConfig.NAME_CONFIG, "test"); - properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink"); + private Map config() { + final Map properties = new HashMap<>(); + properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName()); properties.put(SinkConnector.TOPICS_CONFIG, TOPIC); properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString()); properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, LargeMessageConverter.class.getName()); + properties.put(asValueConfig(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG), + StringConverter.class.getName()); this.createS3BackedProperties().forEach( - (key, value) -> properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key, value)); + (key, value) -> properties.put(asValueConfig(key), value)); return properties; } - private Properties createProducerProperties(final boolean shouldBack) { - final Properties properties = new Properties(); + private Map createProducerProperties(final boolean shouldBack) { + final Map properties = new HashMap<>(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LargeMessageSerializer.class); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList()); properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, Integer.toString(shouldBack ? 0 : Integer.MAX_VALUE)); properties.putAll(this.createS3BackedProperties());