From 262229a33587d0e9dd1fe8f9d81f24f8df84aeb8 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 14 Jan 2025 07:28:21 +0100 Subject: [PATCH] Update --- .../com/bakdata/kafka/LargeMessageConverterConfig.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/large-message-connect/src/main/java/com/bakdata/kafka/LargeMessageConverterConfig.java b/large-message-connect/src/main/java/com/bakdata/kafka/LargeMessageConverterConfig.java index 3613310..8e6cd87 100644 --- a/large-message-connect/src/main/java/com/bakdata/kafka/LargeMessageConverterConfig.java +++ b/large-message-connect/src/main/java/com/bakdata/kafka/LargeMessageConverterConfig.java @@ -25,9 +25,11 @@ package com.bakdata.kafka; import java.util.Map; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.converters.ByteArrayConverter; import org.apache.kafka.connect.storage.Converter; @@ -76,7 +78,12 @@ private static ConfigDef configDef() { } Converter getConverter() { - return this.getConfiguredInstance(CONVERTER_CLASS_CONFIG, Converter.class); + final Class converterClass = this.getClass(CONVERTER_CLASS_CONFIG); + final Object converter = Utils.newInstance(converterClass); + if (!(converter instanceof Converter)) { + throw new KafkaException(converterClass.getName() + " is not an instance of " + Converter.class.getName()); + } + return (Converter) converter; } }