From c7b5c37793b7b595b67f70c93b128f4ae89b4243 Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Sun, 19 Nov 2023 09:58:26 +0200 Subject: [PATCH] . --- src/main/scala/config/KafkaConfig.scala | 7 +++---- src/main/scala/partitioners/GroupPartitioner.scala | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/config/KafkaConfig.scala b/src/main/scala/config/KafkaConfig.scala index 72cfff2..45c7027 100644 --- a/src/main/scala/config/KafkaConfig.scala +++ b/src/main/scala/config/KafkaConfig.scala @@ -78,10 +78,9 @@ object KafkaConfig { ("bootstrap.servers", broker), ("linger.ms", lingerTime), ("max.block.ms", maxBlockMS), - //todo: add condition here - ("partitioner.class", if (useGroupPartitioner) "partitioners.GroupPartitioner" else "org.apache.kafka.clients.producer.internals.DefaultPartitioner") - ) ++ batchSize.map(size => Seq(("batch.size", size))).getOrElse(Seq()) - + ) ++ + batchSize.map(size => Seq(("batch.size", size))).getOrElse(Seq()) ++ + (if (useGroupPartitioner) Seq(("partitioner.class", "partitioners.GroupPartitioner")) else Seq.empty) KafkaConfig(readinessTopic, producerConfig) } diff --git a/src/main/scala/partitioners/GroupPartitioner.scala b/src/main/scala/partitioners/GroupPartitioner.scala index 4e61626..584f0e4 100644 --- a/src/main/scala/partitioners/GroupPartitioner.scala +++ b/src/main/scala/partitioners/GroupPartitioner.scala @@ -2,6 +2,7 @@ package partitioners import org.apache.kafka.clients.producer.Partitioner import org.apache.kafka.common.Cluster +import ciris.{ConfigDecoder, ConfigValue, Effect, env} class GroupPartitioner extends Partitioner { private val partitionsPerGroup = 5