Skip to content

Commit

Permalink
Test converter using embedded connect (bakdata#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored and yidian1997 committed Jan 15, 2025
1 parent 1dc9607 commit b80bafc
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 43 deletions.
13 changes: 10 additions & 3 deletions large-message-connect/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,16 @@ dependencies {

val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.6.0") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion)
testImplementation(testFixtures(project(":large-message-core")))
testImplementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion)
testImplementation(
group = "org.apache.kafka",
name = "connect-runtime",
version = kafkaVersion,
classifier = "test"
)
testImplementation(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion, classifier = "test")
testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion)
testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion, classifier = "test")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,28 +24,27 @@

package com.bakdata.kafka;

import static net.mguenther.kafka.junit.Wait.delay;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.mguenther.kafka.junit.EmbeddedConnectConfig;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.SendKeyValues;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.file.FileStreamSinkConnector;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -58,15 +57,25 @@ class LargeMessageConverterIntegrationTest extends AmazonS3IntegrationTest {
private static final String TOPIC = "input";
private static final String EXTRACT_RECORD_KEY = "key1";
private static final String DOWNLOAD_RECORD_KEY = "key2";
private EmbeddedKafkaCluster kafkaCluster;
private EmbeddedConnectCluster kafkaCluster;
private Path outputFile;

private static String asValueConfig(final String key) {
return ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key;
}

@BeforeEach
void setUp() throws IOException {
this.outputFile = Files.createTempFile("test", "temp");
final S3Client s3 = this.getS3Client();
s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build());
this.kafkaCluster = this.createCluster();
this.kafkaCluster = new EmbeddedConnectCluster.Builder()
.name("test-cluster")
.workerProps(new HashMap<>(Map.of( // map needs to be mutable
// FIXME make compatible with service discovery
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, HYBRID_WARN.toString()
)))
.build();
this.kafkaCluster.start();
}

Expand All @@ -78,60 +87,54 @@ void tearDown() throws IOException {

@Test
void shouldProcessRecordsCorrectly() throws InterruptedException, IOException {
this.kafkaCluster
.send(SendKeyValues.to(TOPIC, Collections.singletonList(new KeyValue<>(DOWNLOAD_RECORD_KEY, "toS3")))
.withAll(this.createProducerProperties(true)).build());

this.kafkaCluster.send(SendKeyValues
.to(TOPIC, Collections.singletonList(new KeyValue<>(EXTRACT_RECORD_KEY, "local")))
.withAll(this.createProducerProperties(false)).build());
this.kafkaCluster.kafka().createTopic(TOPIC);
this.kafkaCluster.configureConnector("test", this.config());
try (final Producer<String, String> producer = this.createProducer(this.createProducerProperties(true))) {
producer.send(new ProducerRecord<>(TOPIC, DOWNLOAD_RECORD_KEY, "toS3"));
}
try (final Producer<String, String> producer = this.createProducer(this.createProducerProperties(false))) {
producer.send(new ProducerRecord<>(TOPIC, EXTRACT_RECORD_KEY, "local"));
}

// makes sure that both records are processed
delay(2, TimeUnit.SECONDS);
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
final List<String> output = Files.readAllLines(this.outputFile);
assertThat(output).containsExactly("toS3", "local");
}

private EmbeddedKafkaCluster createCluster() {
return EmbeddedKafkaCluster.provisionWith(EmbeddedKafkaClusterConfig
.newClusterConfig()
.configure(
EmbeddedConnectConfig
.kafkaConnect()
.deployConnector(this.config())
.build())
.build());
@SuppressWarnings("unchecked") // Producer always uses byte[] although serializer is customizable
private <K, V> Producer<K, V> createProducer(final Map<String, Object> properties) {
return (Producer<K, V>) this.kafkaCluster.kafka()
.createProducer(properties);
}

private Properties createS3BackedProperties() {
final Properties properties = new Properties();
properties.putAll(this.getLargeMessageConfig());
private Map<String, String> createS3BackedProperties() {
final Map<String, String> properties = new HashMap<>(this.getLargeMessageConfig());
properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName());
properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());
properties.put(
AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/%s", BUCKET_NAME, S3_KEY_NAME));
properties.setProperty(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return properties;
}

private Properties config() {
final Properties properties = new Properties();
properties.put(ConnectorConfig.NAME_CONFIG, "test");
properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink");
private Map<String, String> config() {
final Map<String, String> properties = new HashMap<>();
properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName());
properties.put(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, LargeMessageConverter.class.getName());
properties.put(asValueConfig(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG),
StringConverter.class.getName());
this.createS3BackedProperties().forEach(
(key, value) -> properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key, value));
(key, value) -> properties.put(asValueConfig(key), value));
return properties;
}

private Properties createProducerProperties(final boolean shouldBack) {
final Properties properties = new Properties();
private Map<String, Object> createProducerProperties(final boolean shouldBack) {
final Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LargeMessageSerializer.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList());
properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG,
Integer.toString(shouldBack ? 0 : Integer.MAX_VALUE));
properties.putAll(this.createS3BackedProperties());
Expand Down

0 comments on commit b80bafc

Please sign in to comment.