Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
guysegal committed Nov 19, 2023
1 parent 553ba63 commit c7b5c37
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
7 changes: 3 additions & 4 deletions src/main/scala/config/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ object KafkaConfig {
("bootstrap.servers", broker),
("linger.ms", lingerTime),
("max.block.ms", maxBlockMS),
//todo: add condition here
("partitioner.class", if (useGroupPartitioner) "partitioners.GroupPartitioner" else "org.apache.kafka.clients.producer.internals.DefaultPartitioner")
) ++ batchSize.map(size => Seq(("batch.size", size))).getOrElse(Seq())

) ++
batchSize.map(size => Seq(("batch.size", size))).getOrElse(Seq()) ++
(if (useGroupPartitioner) Seq(("partitioner.class", "partitioners.GroupPartitioner")) else Seq.empty)

KafkaConfig(readinessTopic, producerConfig)
}
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/partitioners/GroupPartitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package partitioners

import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster
import ciris.{ConfigDecoder, ConfigValue, Effect, env}

class GroupPartitioner extends Partitioner {
private val partitionsPerGroup = 5
Expand Down

0 comments on commit c7b5c37

Please sign in to comment.