Skip to content

Commit

Permalink
Use Apache Kafka ConfigDef (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jan 14, 2025
1 parent 71a37d7 commit f58bf63
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 33 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ implementation group: 'com.bakdata.kafka', name: 'large-message-serde', version:

For other build tools or versions, refer to the [latest version in MvnRepository](https://mvnrepository.com/artifact/com.bakdata.kafka/large-message-serde/latest).

Make sure to also add [Confluent Maven Repository](http://packages.confluent.io/maven/) to your build file.

#### Usage

You can use it from your Kafka Streams application like any other Serde
Expand Down Expand Up @@ -229,7 +227,7 @@ We also provide a method for cleaning up all files on the blob storage associate
```java
final Map<String, Object> properties = ...;
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
final LargeMessageStoringClient storer = config.getStorer()
final LargeMessageStoringClient storer = config.getStorer();
storer.deleteAllFiles("topic");
```

Expand Down
1 change: 0 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ allprojects {

repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
}
}

Expand Down
1 change: 0 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ org.gradle.caching=true
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx2048m
kafkaVersion=3.8.1
confluentVersion=7.8.0
junitVersion=5.11.4
log4jVersion=2.24.3
assertJVersion=3.27.2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,10 +24,10 @@

package com.bakdata.kafka;

import io.confluent.common.config.ConfigDef;
import io.confluent.common.config.ConfigDef.Importance;
import io.confluent.common.config.ConfigDef.Type;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.connect.converters.ByteArrayConverter;
import org.apache.kafka.connect.storage.Converter;

Expand Down Expand Up @@ -76,7 +76,7 @@ private static ConfigDef configDef() {
}

Converter getConverter() {
return this.getConfiguredInstance(CONVERTER_CLASS_CONFIG, Converter.class);
return this.getInstance(CONVERTER_CLASS_CONFIG, Converter.class);
}

}
3 changes: 0 additions & 3 deletions large-message-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ dependencies {
val kafkaVersion: String by project
api(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion)

val confluentVersion: String by project
api(group = "io.confluent", name = "common-config", version = confluentVersion)

implementation(group = "org.slf4j", name = "slf4j-api", version = "2.0.16")
val awsVersion = "2.29.4"
api(group = "software.amazon.awssdk", name = "s3", version = awsVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.confluent.common.config.AbstractConfig;
import io.confluent.common.config.ConfigDef;
import io.confluent.common.config.ConfigDef.Importance;
import io.confluent.common.config.ConfigDef.Type;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -46,7 +41,13 @@
import java.util.Optional;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.utils.Utils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand Down Expand Up @@ -257,6 +258,15 @@ public LargeMessageStoringClient getStorer() {
.build();
}

protected <T> T getInstance(final String key, final Class<T> targetClass) {
final Class<?> configuredClass = this.getClass(key);
final Object o = Utils.newInstance(configuredClass);
if (!targetClass.isInstance(o)) {
throw new KafkaException(configuredClass.getName() + " is not an instance of " + targetClass.getName());
}
return targetClass.cast(o);
}

private BlobStorageClient getClient() {
return this.getBasePath()
.map(BlobStorageURI::getScheme)
Expand Down Expand Up @@ -383,7 +393,7 @@ private BlobStorageClient createGoogleStorageClient() {

private GoogleCredentials getGoogleCredentials() {
try (final FileInputStream credentialsStream = new FileInputStream(this.getString(GOOGLE_CLOUD_KEY_PATH))) {
final List<String> scopes = Lists.newArrayList(GOOGLE_CLOUD_OAUTH_SCOPE);
final List<String> scopes = List.of(GOOGLE_CLOUD_OAUTH_SCOPE);
return GoogleCredentials.fromStream(credentialsStream).createScoped(scopes);
} catch (final IOException ioException) {
throw new UncheckedIOException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableMap;
import io.confluent.common.config.ConfigDef;
import java.util.Map;
import java.util.stream.Stream;
import lombok.Builder;
Expand Down Expand Up @@ -110,8 +109,7 @@ private LargeMessageStoringClient createStorer(final Map<String, Object> basePro

private LargeMessageRetrievingClient createRetriever() {
final Map<String, String> properties = this.getLargeMessageConfig();
final ConfigDef configDef = AbstractLargeMessageConfig.baseConfigDef();
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(configDef, properties);
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
return config.getRetriever();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.azure.core.util.BinaryData;
import com.azure.storage.blob.BlobContainerClient;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.config.ConfigDef;
import java.util.Map;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -75,8 +74,7 @@ private Map<String, Object> createProperties() {

private LargeMessageRetrievingClient createRetriever() {
final Map<String, Object> properties = this.createProperties();
final ConfigDef configDef = AbstractLargeMessageConfig.baseConfigDef();
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(configDef, properties);
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
return config.getRetriever();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static com.bakdata.kafka.LargeMessageRetrievingClientTest.serializeUri;
import static org.assertj.core.api.Assertions.assertThat;

import io.confluent.common.config.ConfigDef;
import java.util.Map;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -62,8 +61,7 @@ void shouldReadBackedText() {

private LargeMessageRetrievingClient createRetriever() {
final Map<String, String> properties = this.getLargeMessageConfig();
final ConfigDef configDef = AbstractLargeMessageConfig.baseConfigDef();
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(configDef, properties);
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
return config.getRetriever();
}

Expand Down
2 changes: 1 addition & 1 deletion large-message-serde/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "fluent-kafka-streams-tests-junit5",
version = "2.16.0"
version = "3.0.0"
)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2020 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,10 +24,10 @@

package com.bakdata.kafka;

import io.confluent.common.config.ConfigDef;
import io.confluent.common.config.ConfigDef.Importance;
import io.confluent.common.config.ConfigDef.Type;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;

Expand Down

0 comments on commit f58bf63

Please sign in to comment.