Skip to content

Commit

Permalink
Upgrade to Kafka 3.8 (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jan 9, 2025
1 parent 9c4f15d commit 85f183e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 16 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ version=2.8.1-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx2048m
kafkaVersion=3.6.1
confluentVersion=7.6.0
kafkaVersion=3.8.1
confluentVersion=7.8.0
junitVersion=5.11.4
log4jVersion=2.24.3
assertJVersion=3.27.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -48,6 +48,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

Expand All @@ -58,15 +59,15 @@ class LargeMessageConverterIntegrationTest extends AmazonS3IntegrationTest {
private static final String EXTRACT_RECORD_KEY = "key1";
private static final String DOWNLOAD_RECORD_KEY = "key2";
private EmbeddedConnectCluster kafkaCluster;
private Path outputFile;
@TempDir
private File outputDir;

private static String asValueConfig(final String key) {
return ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key;
}

@BeforeEach
void setUp() throws IOException {
this.outputFile = Files.createTempFile("test", "temp");
void setUp() {
final S3Client s3 = this.getS3Client();
s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build());
this.kafkaCluster = new EmbeddedConnectCluster.Builder()
Expand All @@ -80,15 +81,15 @@ void setUp() throws IOException {
}

@AfterEach
void tearDown() throws IOException {
void tearDown() {
this.kafkaCluster.stop();
Files.deleteIfExists(this.outputFile);
}

@Test
void shouldProcessRecordsCorrectly() throws InterruptedException, IOException {
this.kafkaCluster.kafka().createTopic(TOPIC);
this.kafkaCluster.configureConnector("test", this.config());
final File file = new File(this.outputDir, "out");
this.kafkaCluster.configureConnector("test", this.config(file));
try (final Producer<String, String> producer = this.createProducer(this.createProducerProperties(true))) {
producer.send(new ProducerRecord<>(TOPIC, DOWNLOAD_RECORD_KEY, "toS3"));
}
Expand All @@ -98,7 +99,7 @@ void shouldProcessRecordsCorrectly() throws InterruptedException, IOException {

// makes sure that both records are processed
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
final List<String> output = Files.readAllLines(this.outputFile);
final List<String> output = Files.readAllLines(file.toPath());
assertThat(output).containsExactly("toS3", "local");
}

Expand All @@ -117,11 +118,11 @@ private Map<String, String> createS3BackedProperties() {
return properties;
}

private Map<String, String> config() {
private Map<String, String> config(final File file) {
final Map<String, String> properties = new HashMap<>();
properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName());
properties.put(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.put(FileStreamSinkConnector.FILE_CONFIG, file.getAbsolutePath());
properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, LargeMessageConverter.class.getName());
properties.put(asValueConfig(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,6 +30,7 @@
import java.nio.ByteBuffer;
import lombok.Getter;
import lombok.NonNull;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.BufferSupplier;
Expand Down Expand Up @@ -148,7 +149,8 @@ static CompressionType forName(final String name) {
private static byte[] compress(final org.apache.kafka.common.record.CompressionType compressionType,
final byte[] bytes) {
final ByteBufferOutputStream outStream = new ByteBufferOutputStream(bytes.length);
try (final OutputStream stream = compressionType.wrapForOutput(outStream, RecordBatch.MAGIC_VALUE_V2)) {
final Compression compression = Compression.of(compressionType).build();
try (final OutputStream stream = compression.wrapForOutput(outStream, RecordBatch.MAGIC_VALUE_V2)) {
stream.write(bytes);
stream.flush();
} catch (final IOException e) {
Expand All @@ -160,7 +162,8 @@ private static byte[] compress(final org.apache.kafka.common.record.CompressionT

private static byte[] decompress(final org.apache.kafka.common.record.CompressionType compressionType,
final byte[] bytes) {
try (final InputStream stream = compressionType.wrapForInput(ByteBuffer.wrap(bytes), RecordBatch.MAGIC_VALUE_V2,
final Compression compression = Compression.of(compressionType).build();
try (final InputStream stream = compression.wrapForInput(ByteBuffer.wrap(bytes), RecordBatch.MAGIC_VALUE_V2,
BUFFER_SUPPLIER)) {
return stream.readAllBytes();
} catch (final IOException e) {
Expand Down
2 changes: 1 addition & 1 deletion large-message-serde/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "fluent-kafka-streams-tests-junit5",
version = "2.14.0"
version = "2.16.0"
)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
Expand Down

0 comments on commit 85f183e

Please sign in to comment.