Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test converter using embedded connect #53

Merged
merged 4 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions large-message-connect/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,16 @@ dependencies {

val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.6.0") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion)
testImplementation(testFixtures(project(":large-message-core")))
testImplementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion)
testImplementation(
group = "org.apache.kafka",
name = "connect-runtime",
version = kafkaVersion,
classifier = "test"
)
testImplementation(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion, classifier = "test")
testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion)
testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion, classifier = "test")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,28 +24,27 @@

package com.bakdata.kafka;

import static net.mguenther.kafka.junit.Wait.delay;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.mguenther.kafka.junit.EmbeddedConnectConfig;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.SendKeyValues;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.file.FileStreamSinkConnector;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -58,15 +57,25 @@ class LargeMessageConverterIntegrationTest extends AmazonS3IntegrationTest {
private static final String TOPIC = "input";
private static final String EXTRACT_RECORD_KEY = "key1";
private static final String DOWNLOAD_RECORD_KEY = "key2";
private EmbeddedKafkaCluster kafkaCluster;
private EmbeddedConnectCluster kafkaCluster;
private Path outputFile;

private static String asValueConfig(final String key) {
return ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key;
}

@BeforeEach
void setUp() throws IOException {
this.outputFile = Files.createTempFile("test", "temp");
final S3Client s3 = this.getS3Client();
s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build());
this.kafkaCluster = this.createCluster();
this.kafkaCluster = new EmbeddedConnectCluster.Builder()
.name("test-cluster")
.workerProps(new HashMap<>(Map.of( // map needs to be mutable
// FIXME make compatible with service discovery
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, HYBRID_WARN.toString()
)))
.build();
this.kafkaCluster.start();
}

Expand All @@ -78,60 +87,54 @@ void tearDown() throws IOException {

@Test
void shouldProcessRecordsCorrectly() throws InterruptedException, IOException {
this.kafkaCluster
.send(SendKeyValues.to(TOPIC, Collections.singletonList(new KeyValue<>(DOWNLOAD_RECORD_KEY, "toS3")))
.withAll(this.createProducerProperties(true)).build());

this.kafkaCluster.send(SendKeyValues
.to(TOPIC, Collections.singletonList(new KeyValue<>(EXTRACT_RECORD_KEY, "local")))
.withAll(this.createProducerProperties(false)).build());
this.kafkaCluster.kafka().createTopic(TOPIC);
this.kafkaCluster.configureConnector("test", this.config());
try (final Producer<String, String> producer = this.createProducer(this.createProducerProperties(true))) {
producer.send(new ProducerRecord<>(TOPIC, DOWNLOAD_RECORD_KEY, "toS3"));
}
try (final Producer<String, String> producer = this.createProducer(this.createProducerProperties(false))) {
producer.send(new ProducerRecord<>(TOPIC, EXTRACT_RECORD_KEY, "local"));
}

// makes sure that both records are processed
delay(2, TimeUnit.SECONDS);
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
final List<String> output = Files.readAllLines(this.outputFile);
assertThat(output).containsExactly("toS3", "local");
}

private EmbeddedKafkaCluster createCluster() {
return EmbeddedKafkaCluster.provisionWith(EmbeddedKafkaClusterConfig
.newClusterConfig()
.configure(
EmbeddedConnectConfig
.kafkaConnect()
.deployConnector(this.config())
.build())
.build());
@SuppressWarnings("unchecked") // Producer always uses byte[] although serializer is customizable
private <K, V> Producer<K, V> createProducer(final Map<String, Object> properties) {
return (Producer<K, V>) this.kafkaCluster.kafka()
.createProducer(properties);
}

private Properties createS3BackedProperties() {
final Properties properties = new Properties();
properties.putAll(this.getLargeMessageConfig());
private Map<String, String> createS3BackedProperties() {
final Map<String, String> properties = new HashMap<>(this.getLargeMessageConfig());
properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName());
properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());
properties.put(
AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/%s", BUCKET_NAME, S3_KEY_NAME));
properties.setProperty(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return properties;
}

private Properties config() {
final Properties properties = new Properties();
properties.put(ConnectorConfig.NAME_CONFIG, "test");
properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink");
private Map<String, String> config() {
final Map<String, String> properties = new HashMap<>();
properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName());
properties.put(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, LargeMessageConverter.class.getName());
properties.put(asValueConfig(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG),
StringConverter.class.getName());
this.createS3BackedProperties().forEach(
(key, value) -> properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key, value));
(key, value) -> properties.put(asValueConfig(key), value));
return properties;
}

private Properties createProducerProperties(final boolean shouldBack) {
final Properties properties = new Properties();
private Map<String, Object> createProducerProperties(final boolean shouldBack) {
final Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LargeMessageSerializer.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList());
properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG,
Integer.toString(shouldBack ? 0 : Integer.MAX_VALUE));
properties.putAll(this.createS3BackedProperties());
Expand Down