From df4c558618fda34985112a9fb968d61ab631097c Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Mon, 13 Jan 2025 17:10:22 +0800 Subject: [PATCH] feat(tools/perf): support to limit the max poll rate of consumers (#2270) Signed-off-by: Ning Yu --- build.gradle | 1 + .../kafka/tools/automq/PerfCommand.java | 2 +- .../tools/automq/perf/ConsumerService.java | 25 ++++++++++++++++--- .../kafka/tools/automq/perf/PerfConfig.java | 13 ++++++++++ 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index b211b08b17..eb503d1a96 100644 --- a/build.gradle +++ b/build.gradle @@ -2232,6 +2232,7 @@ project(':tools') { // AutoMQ inject start implementation project(':automq-shell') implementation libs.kafkaAvroSerializer + implementation libs.bucket4j // AutoMQ inject end implementation project(':storage') diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java index 300ebb6d79..a17c042b58 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java @@ -106,7 +106,7 @@ private void run() { LOGGER.info("Creating consumers..."); int consumers = consumerService.createConsumers(topics, config.consumersConfig()); - consumerService.start(this::messageReceived); + consumerService.start(this::messageReceived, config.maxConsumeRecordRate); LOGGER.info("Created {} consumers, took {} ms", consumers, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); LOGGER.info("Creating producers..."); diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java index 81627ee4f8..fe3ec456a4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java @@ -52,6 +52,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.github.bucket4j.BlockingBucket; +import io.github.bucket4j.Bucket; + import static org.apache.kafka.tools.automq.perf.ProducerService.HEADER_KEY_CHARSET; import static org.apache.kafka.tools.automq.perf.ProducerService.HEADER_KEY_SEND_TIME_NANOS; @@ -91,10 +94,15 @@ public int createConsumers(List topics, ConsumersConfig config) { return count; } - public void start(ConsumerCallback callback) { + public void start(ConsumerCallback callback, int pollRate) { + BlockingBucket bucket = rateLimitBucket(pollRate); + ConsumerCallback callbackWithRateLimit = (tp, p, st) -> { + callback.messageReceived(tp, p, st); + bucket.consume(1); + }; CompletableFuture.allOf( groups.stream() - .map(group -> group.start(callback)) + .map(group -> group.start(callbackWithRateLimit)) .toArray(CompletableFuture[]::new) ).join(); } @@ -122,6 +130,15 @@ public int consumerCount() { .sum(); } + private BlockingBucket rateLimitBucket(int rateLimit) { + return Bucket.builder() + .addLimit(limit -> limit + .capacity(rateLimit / 10) + .refillGreedy(rateLimit, Duration.ofSeconds(1)) + ).build() + .asBlocking(); + } + @Override public void close() { admin.close(); @@ -137,7 +154,7 @@ public interface ConsumerCallback { * @param payload the received message payload * @param sendTimeNanos the time in nanoseconds when the message was sent */ - void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos); + void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) throws InterruptedException; } public static class ConsumersConfig { @@ -316,7 +333,7 @@ private void pollRecords(KafkaConsumer consumer, ConsumerCallbac TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); callback.messageReceived(topicPartition, record.value(), sendTimeNanos); } - } catch (InterruptException e) { + } catch (InterruptException | InterruptedException e) { // ignore, as we are closing } catch (Exception e) { LOGGER.warn("exception occur while consuming message", e); diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java index a56f2af514..2e81c745aa 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom; import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; +import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.between; import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.nonNegativeInteger; import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.notLessThan; import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.positiveInteger; @@ -53,6 +54,7 @@ public class PerfConfig { public final int randomPoolSize; public final int sendRate; public final int sendRateDuringCatchup; + public final int maxConsumeRecordRate; public final int backlogDurationSeconds; public final int groupStartDelaySeconds; public final int warmupDurationMinutes; @@ -92,6 +94,7 @@ public PerfConfig(String[] args) { randomPoolSize = ns.getInt("randomPoolSize"); sendRate = ns.getInt("sendRate"); sendRateDuringCatchup = ns.getInt("sendRateDuringCatchup") == null ? sendRate : ns.getInt("sendRateDuringCatchup"); + maxConsumeRecordRate = ns.getInt("maxConsumeRecordRate"); backlogDurationSeconds = ns.getInt("backlogDurationSeconds"); groupStartDelaySeconds = ns.getInt("groupStartDelaySeconds"); warmupDurationMinutes = ns.getInt("warmupDurationMinutes"); @@ -209,6 +212,12 @@ public static ArgumentParser parser() { .dest("sendRateDuringCatchup") .metavar("SEND_RATE_DURING_CATCHUP") .help("The send rate in messages per second during catchup. If not set, the send rate will be used."); + parser.addArgument("-m", "--max-poll-rate") + .setDefault(1_000_000_000) + .type(between(0, 1_000_000_000)) + .dest("maxConsumeRecordRate") + .metavar("MAX_CONSUME_RECORD_RATE") + .help("The max rate of consuming records per second."); parser.addArgument("-b", "--backlog-duration") .setDefault(0) .type(notLessThan(300)) @@ -351,6 +360,10 @@ public static IntegerArgumentType positiveInteger() { public static IntegerArgumentType notLessThan(int min) { return new IntegerArgumentType(value -> value < min ? "expected an integer not less than " + min + ", but got " + value : null); } + + public static IntegerArgumentType between(int min, int max) { + return new IntegerArgumentType(value -> value < min || value > max ? "expected an integer between " + min + " and " + max + ", but got " + value : null); + } } @FunctionalInterface