Skip to content

Commit

Permalink
feat: use the internal partitioner to choose the partition to send ms…
Browse files Browse the repository at this point in the history
…g to

Signed-off-by: Ning Yu <ningyu@automq.com>
  • Loading branch information
Chillax-0v0 authored and superhx committed Dec 25, 2024
1 parent b47ffab commit 5979da1
Showing 1 changed file with 2 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ private Producer createProducer(Topic topic, ProducersConfig config, ProducerCal
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG, true);

KafkaProducer<String, byte[]> kafkaProducer = new KafkaProducer<>(properties);
return new Producer(kafkaProducer, topic, callback);
Expand Down Expand Up @@ -265,8 +266,6 @@ private static class Producer implements AutoCloseable {
private final Topic topic;
private final ProducerCallback callback;

private int partitionIndex = 0;

public Producer(KafkaProducer<String, byte[]> producer, Topic topic, ProducerCallback callback) {
this.producer = producer;
this.topic = topic;
Expand All @@ -278,11 +277,7 @@ public Producer(KafkaProducer<String, byte[]> producer, Topic topic, ProducerCal
* NOT thread-safe.
*/
public CompletableFuture<Void> sendAsync(byte[] payload) {
return sendAsync(nextKey(), payload, nextPartition());
}

private int nextPartition() {
return partitionIndex++ % topic.partitions;
return sendAsync(nextKey(), payload, null);
}

private String nextKey() {
Expand Down

0 comments on commit 5979da1

Please sign in to comment.