Skip to content

Commit

Permalink
Merge pull request #6 from lyft/e2e-metric-from-fork
Browse files Browse the repository at this point in the history
Recreate PR #4 and a little bit more refactoring.
  • Loading branch information
afalko authored Dec 2, 2019
2 parents 9e45546 + 25856ce commit c19ea84
Show file tree
Hide file tree
Showing 13 changed files with 633 additions and 313 deletions.
64 changes: 52 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
<artifactId>kafka_partition_availability_benchmark</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
Expand All @@ -46,15 +36,65 @@
<artifactId>vertx-web</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-statsd</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.28</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.charithe</groupId>
<artifactId>kafka-junit</artifactId>
<version>4.1.6</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
Expand Down
Binary file added src/.DS_Store
Binary file not shown.
310 changes: 310 additions & 0 deletions src/main/java/com/salesforce/BenchmarkApp.java

Large diffs are not rendered by default.

127 changes: 65 additions & 62 deletions src/main/java/com/salesforce/ConsumeTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,119 +7,122 @@

package com.salesforce;

import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import io.micrometer.core.instrument.*;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

class ConsumeTopic implements Callable<Exception> {
private static final Logger log = LoggerFactory.getLogger(ConsumeTopic.class);
private static final String AWAITING_CONSUME_METRIC_NAME = "threadsAwaitingConsume";
private static final String AWAITING_COMMIT_METRIC_NAME = "threadsAwaitingCommit";

private static final Histogram consumerReceiveTimeSecs = Histogram
.build("consumerReceiveTimeSecs", "Time taken to do consumer.poll")
.register();
private static final Histogram consumerCommitTimeSecs = Histogram
.build("consumerCommitTimeSecs", "Time it takes to commit new offset")
.register();
private static final Gauge threadsAwaitingConsume = Gauge.build("threadsAwaitingConsume",
"Number of threads that are that are waiting for message batch to be consumed").register();
private static final Gauge threadsAwaitingCommit = Gauge.build("threadsAwaitingCommit",
"Number of threads that are that are waiting for message batch to be committed").register();
private static final Logger log = LoggerFactory.getLogger(ConsumeTopic.class);

private final int topicId;
private final String key;
private final int readWriteInterval;
private final AdminClient kafkaAdminClient;
private final Map<String, Object> kafkaConsumerConfig;
private final short replicationFactor;
private final boolean keepProducing;
private final Timer consumerReceiveTimeNanos;
private final Timer consumerCommitTimeNanos;
private final String metricsNamespace;
private final String clusterName;
private final int readWriteInterval;

/**
* @param topicId Each topic gets a numeric id
* @param key Prefix for topics created by this tool
* @param readWriteInterval How long should we wait before polls for consuming new messages
* @param kafkaAdminClient
* @param kafkaConsumerConfig
* @param keepProducing Whether we are continuously producing messages rather than just producing once
* @param topicId Each topic gets a numeric id.
* @param key Prefix for topics created by this tool.
* @param kafkaAdminClient Kafka admin client we are using.
* @param kafkaConsumerConfig Map that contains consumer configuration.
* @param replicationFactor Replication factor of the topic to be created.
* @param consumerCommitTimeNanos Time it takes for the consumer to commit its offset.
* @param consumerReceiveTimeNanos Time it takes for the consumer to receive the message.
* @param metricsNamespace The namespace to use when submitting metrics.
* @param clusterName Name of the cluster we are monitoring.
* @param readWriteInterval How long should we wait before polls for consuming new messages
*/
public ConsumeTopic(int topicId, String key, int readWriteInterval, AdminClient kafkaAdminClient,
Map<String, Object> kafkaConsumerConfig, short replicationFactor, boolean keepProducing) {
public ConsumeTopic(int topicId, String key, AdminClient kafkaAdminClient,
Map<String, Object> kafkaConsumerConfig, short replicationFactor,
Timer consumerReceiveTimeNanos, Timer consumerCommitTimeNanos,
String metricsNamespace, String clusterName, int readWriteInterval) {
this.topicId = topicId;
this.key = key;
this.readWriteInterval = readWriteInterval;
this.kafkaAdminClient = kafkaAdminClient;
this.kafkaConsumerConfig = Collections.unmodifiableMap(kafkaConsumerConfig);
this.replicationFactor = replicationFactor;
this.keepProducing = keepProducing;
this.consumerReceiveTimeNanos = consumerReceiveTimeNanos;
this.consumerCommitTimeNanos = consumerCommitTimeNanos;
this.metricsNamespace = metricsNamespace;
this.clusterName = clusterName;
this.readWriteInterval = readWriteInterval;
}

@Override
public Exception call() {
String topicName = TopicName.createTopicName(key, topicId);
try {
TopicVerifier.checkTopic(kafkaAdminClient, topicName, replicationFactor);
TopicVerifier.checkTopic(kafkaAdminClient, topicName, replicationFactor,
clusterName, metricsNamespace, false);

Map<String, Object> consumerConfigForTopic = new HashMap<>(kafkaConsumerConfig);
consumerConfigForTopic.put(ConsumerConfig.GROUP_ID_CONFIG, topicName);
KafkaConsumer<Integer, Integer> consumer = new KafkaConsumer<>(consumerConfigForTopic);
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfigForTopic);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
consumer.assign(Collections.singleton(topicPartition));
consumer.seekToBeginning(Collections.singleton(topicPartition));

threadsAwaitingConsume.inc();
gaugeMetric(AWAITING_CONSUME_METRIC_NAME, 1);
while (true) {
ConsumerRecords<Integer, Integer> messages;
Histogram.Timer consumerReceiveTimer = consumerReceiveTimeSecs.startTimer();
try {
messages = consumer.poll(0);
} finally {
consumerReceiveTimer.observeDuration();
}
ConsumerRecords<Integer, byte[]> messages = consumer.poll(Duration.ofMillis(100));
if (messages.count() == 0) {
if (keepProducing) {
threadsAwaitingConsume.dec();
Thread.sleep(readWriteInterval);
threadsAwaitingConsume.inc();
continue;
}
log.debug("Ran out of messages to process for topic {}; starting from beginning", topicName);
consumer.seekToBeginning(Collections.singleton(topicPartition));
threadsAwaitingCommit.inc();
consumerCommitTimeSecs.time(consumer::commitSync);
threadsAwaitingCommit.dec();
threadsAwaitingConsume.dec();
Thread.sleep(readWriteInterval);
threadsAwaitingConsume.inc();
log.debug("No messages detected on {}", topicName);
continue;
}
gaugeMetric(AWAITING_CONSUME_METRIC_NAME, -1);

threadsAwaitingConsume.dec();
threadsAwaitingCommit.inc();
consumerCommitTimeSecs.time(consumer::commitSync);
threadsAwaitingCommit.dec();
AtomicLong lastOffset = new AtomicLong();
log.debug("Consuming {} records", messages.records(topicPartition).size());
messages.records(topicPartition).forEach(consumerRecord -> {
consumerReceiveTimeNanos.record(Duration.ofMillis(System.currentTimeMillis() - consumerRecord.timestamp()));
lastOffset.set(consumerRecord.offset());
});

ConsumerRecord<Integer, Integer> lastMessage =
messages.records(topicPartition).get(messages.count() - 1);
gaugeMetric(AWAITING_COMMIT_METRIC_NAME, 1);
consumerCommitTimeNanos.record(() ->
consumer.commitSync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(lastOffset.get() + 1))));
gaugeMetric(AWAITING_COMMIT_METRIC_NAME, -1);

consumer.seek(topicPartition, lastOffset.get() + 1);

log.debug("Last consumed message {}:{}, consumed {} messages, topic: {}",
lastMessage.key(), lastMessage.value(), messages.count(), topicName);
ConsumerRecord<Integer, byte[]> lastMessage =
messages.records(topicPartition).get(messages.count() - 1);
String lastValue = new String(lastMessage.value());
String truncatedValue = lastValue.length() <= 15 ? lastValue : lastValue.substring(0, 15);
log.debug("Last consumed message {} -> {}..., consumed {} messages, topic: {}",
lastMessage.key(), truncatedValue, messages.count(), topicName);
Thread.sleep(readWriteInterval);
threadsAwaitingConsume.inc();
gaugeMetric(AWAITING_CONSUME_METRIC_NAME, 1);
}
} catch (Exception e) {
log.error("Failed consume", e);
return new Exception("Failed consume on topicName " + topicId, e);
}
}

private void gaugeMetric(String metricName, final int value) {
Metrics.gauge(metricsNamespace,
Tags.of(CustomOrderedTag.of("cluster", clusterName, 1),
CustomOrderedTag.of("metric", metricName, 2)),
value);
}
}
21 changes: 13 additions & 8 deletions src/main/java/com/salesforce/CreateTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

package com.salesforce;

import io.prometheus.client.Histogram;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
Expand All @@ -20,20 +21,23 @@
public class CreateTopic implements Callable<Exception> {
private static final Logger log = LoggerFactory.getLogger(CreateTopic.class);

private static final Histogram topicCreateTimeSecs = Histogram
.build("topicCreateTimeSecs", "Topic create time in ms")
.register();

private final int topicId;
private final String key;
private final AdminClient kafkaAdminClient;
private final short replicationFactor;
private final Timer topicCreateTimeNanos;
private final String clusterName;
private final String metricNamespace;

public CreateTopic(int topicId, String key, AdminClient kafkaAdminClient, short replicationFactor) {
public CreateTopic(int topicId, String key, AdminClient kafkaAdminClient, short replicationFactor,
String clusterName, String metricNamespace, Timer topicCreateTimeNanos) {
this.topicId = topicId;
this.key = key;
this.kafkaAdminClient = kafkaAdminClient;
this.replicationFactor = replicationFactor;
this.topicCreateTimeNanos = topicCreateTimeNanos;
this.clusterName = clusterName;
this.metricNamespace = metricNamespace;
}

@Override
Expand All @@ -45,9 +49,10 @@ public Exception call() throws Exception {
kafkaAdminClient.createTopics(topic);

// Wait for topic to be created and for leader election to happen
topicCreateTimeSecs.time(() -> {
topicCreateTimeNanos.record(() -> {
try {
TopicVerifier.checkTopic(kafkaAdminClient, topicName, replicationFactor);
TopicVerifier.checkTopic(kafkaAdminClient, topicName, replicationFactor,
clusterName, metricNamespace,false);
} catch (InterruptedException e) {
log.error("Unable to record topic creation", e);
}
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/com/salesforce/CustomOrderedTag.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.salesforce;

import io.micrometer.core.instrument.Tag;

/**
* This class implements io.micrometer.core.instrument.Tag interface
* and provides a hard coded order. This is useful because
* otherwise micrometer implementation orders the tags based on the keys
* and this leads to ugly metric naming.
*/
public class CustomOrderedTag implements Tag {
private final String key;
private final String value;
private final int relativeOrder;

private CustomOrderedTag(final String key, final String value, final int relativeOrder) {
this.key = key;
this.value = value;
this.relativeOrder = relativeOrder;
}
static Tag of(String key, String value, int relativeOrder) {
return new CustomOrderedTag(key, value, relativeOrder);
}

@Override
public String getKey() {
return key;
}

@Override
public String getValue() {
return value;
}

@Override
public int compareTo(Tag o) {
if (o instanceof CustomOrderedTag) {
return Integer.compare(this.relativeOrder, ((CustomOrderedTag) o).relativeOrder);
} else {
return getKey().compareTo(o.getKey());
}
}
}
Loading

0 comments on commit c19ea84

Please sign in to comment.