Skip to content

Commit

Permalink
specify retry topic per topic (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
msegelvik authored and nilsmagnus committed Sep 25, 2019
1 parent 2ea9b00 commit f843009
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 43 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ From maven central:

// build a consumer-pool
ReliableKafkaConsumerPool<String,String> pool = new ReliablePoolBuilder<>(eventClientFactory)
.topics(List.of(TOPIC, TOPIC2)) // list of topics to subscribe to
.topics(List.of(TOPIC, TOPIC2)) // list of topics to subscribe to, will generate retry topics
.poolCount(5) // optional: number of threads consuming from kafka
.processingFunction(record -> process(record)) // required : a function that processes records
.retryPeriodMillis(24 * 60 * 60 * 1000) // optional: how long a message should be retried before given up on
Expand All @@ -69,6 +69,21 @@ From maven central:
// optional: check if all consumer-threads are alive, recommended to check these every minute or so
pool.monitor.monitor()

## Explicitly set retry topics

// build a consumer-pool
ReliableKafkaConsumerPool<String,String> pool = new ReliablePoolBuilder<>(eventClientFactory)
.topicsRetryTopics(Collections.singletonMap(TOPIC, RETRY_TOPIC)) //explicitly set retry topic.
.poolCount(5) // optional: number of threads consuming from kafka
.processingFunction(record -> process(record)) // required : a function that processes records
.retryPeriodMillis(24 * 60 * 60 * 1000) // optional: how long a message should be retried before given up on
.retryThrottleMillis(5_000) // optional: specify how fast a message should be retried
.build();
pool.monitor.start(); // start the pool

// optional: check if all consumer-threads are alive, recommended to check these every minute or so
pool.monitor.monitor()

# Detailed description

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package no.finn.retriableconsumer;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -28,7 +30,6 @@ public class ReliableKafkaConsumerPool<K, V> implements Closeable {
/**
* @param consumerPoolCount - number of kafka-consumers
* @param factory - factory that creates consumer and producer
* @param topics - topic to listen to
* @param processingFunction - function that will process messages
* @param pollFunction - function to poll messages
* @param retryDurationInMillis - how long retry-producer should retry the message before giving up
Expand All @@ -38,15 +39,15 @@ public class ReliableKafkaConsumerPool<K, V> implements Closeable {
public ReliableKafkaConsumerPool(
int consumerPoolCount,
KafkaClientFactory<K, V> factory,
List<String> topics,
Map<String, String> topicsRetryTopics,
Function<ConsumerRecord<K, V>, Boolean> processingFunction,
Function<Consumer<K, V>, ConsumerRecords<K, V>> pollFunction,
long retryThrottleMillis,
long retryDurationInMillis
) {

// queue for safe communication between consumers and retry-producer
RetryHandler<K, V> retryHandler = new RetryHandler<>(factory::producer, retryThrottleMillis, factory.groupId());
RetryHandler<K, V> retryHandler = new RetryHandler<>(factory::producer, retryThrottleMillis, topicsRetryTopics);

// consumers
List<Restartable> consumers =
Expand All @@ -55,7 +56,7 @@ public ReliableKafkaConsumerPool(
i ->
new RestartableKafkaConsumer<>(
factory::consumer,
topics,
new ArrayList<>(topicsRetryTopics.keySet()),
processingFunction,
pollFunction,
retryHandler,
Expand All @@ -66,7 +67,7 @@ public ReliableKafkaConsumerPool(
consumers.add(
new RestartableKafkaConsumer<>(
factory::consumer,
RetryHandler.retryTopicNames(topics, factory.groupId()),
new ArrayList<>(topicsRetryTopics.values()),
processingFunction,
pollFunction,
retryHandler,
Expand Down
32 changes: 22 additions & 10 deletions src/main/java/no/finn/retriableconsumer/ReliablePoolBuilder.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package no.finn.retriableconsumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.function.Function;

import no.finn.retriableconsumer.version.ExposeVersion;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ReliablePoolBuilder<K, V> {
static {
// register version to prometheus
Expand All @@ -20,10 +22,10 @@ public class ReliablePoolBuilder<K, V> {
private Function<Consumer<K, V>, ConsumerRecords<K, V>> pollFunction = consumer -> consumer.poll(Duration.of(250, ChronoUnit.MILLIS));
private Integer poolCount = 3;
private final KafkaClientFactory<K, V> factory;
private List<String> topics;
private Function<ConsumerRecord<K, V>, Boolean> processingFunction;
private Long retryThrottleMillis = 5000L;
private Long retryPeriodMillis = 24 * 60 * 60 * 1000L; // 1 day by default
private Map<String, String> topicsRetryTopics;

public ReliablePoolBuilder(KafkaClientFactory<K, V> factory) {
this.factory = factory;
Expand All @@ -40,7 +42,12 @@ public ReliablePoolBuilder<K, V> poolCount(int poolCount) {
}

public ReliablePoolBuilder<K, V> topics(List<String> topics) {
this.topics = topics;
this.topicsRetryTopics = topics.stream().collect(Collectors.toMap(topic -> topic, topic -> retryTopicName(topic, factory.groupId())));
return this;
}

public ReliablePoolBuilder<K, V> topicsRetryTopics(Map<String, String> topicsRetryTopics) {
this.topicsRetryTopics = topicsRetryTopics;
return this;
}

Expand Down Expand Up @@ -75,18 +82,23 @@ public ReliablePoolBuilder<K, V> retryPeriodMillis(long retryPeriodMillis) {
public ReliableKafkaConsumerPool<K, V> build() {
verifyNotNull("pollFunction", pollFunction);
verifyNotNull("poolCount", poolCount);
verifyNotNull("topics", topics);
verifyNotNull("topicsRetryTopics", topicsRetryTopics);
verifyNotNull("processingFunction", processingFunction);
verifyNotNull("retryThrottleMillis", retryThrottleMillis);
verifyNotNull("retryPeriodMillis", retryPeriodMillis);
verifyNotNull("factory", factory);

return new ReliableKafkaConsumerPool<>(poolCount, factory, topics, processingFunction, pollFunction, retryThrottleMillis, retryPeriodMillis);
return new ReliableKafkaConsumerPool<>(poolCount, factory, topicsRetryTopics, processingFunction, pollFunction, retryThrottleMillis, retryPeriodMillis);
}

private void verifyNotNull(String fieldName, Object field) {
if (field == null) {
throw new IllegalStateException(fieldName + " is not set, cannot build an instance of" + ReliableKafkaConsumerPool.class.getCanonicalName());
}
}

static String retryTopicName(String topic, String groupId) {
if (StringUtils.startsWith(topic, "retry")) return topic;
return String.format("%s-%s-%s", "retry", groupId, topic);
}
}
20 changes: 5 additions & 15 deletions src/main/java/no/finn/retriableconsumer/RetryHandler.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package no.finn.retriableconsumer;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
Expand All @@ -23,18 +23,18 @@ public class RetryHandler<K, V> implements Consumer<ConsumerRecord<K, V>> {

private final Supplier<Producer<K, V>> factory;
private final long retryThrottleMillis;
private final String groupId;
private final Map<String, String> topicsRetryTopics;

RetryHandler(Supplier<Producer<K, V>> factory, long retryThrottleMillis, String groupId) {
RetryHandler(Supplier<Producer<K, V>> factory, long retryThrottleMillis, Map<String, String> topicsRetryTopics) {
this.factory = factory;
this.retryThrottleMillis = retryThrottleMillis;
this.groupId = groupId;
this.topicsRetryTopics = topicsRetryTopics;
}


@Override
public void accept(ConsumerRecord<K, V> record) {
String retryTopic = retryTopicName(record.topic(), groupId);
String retryTopic = topicsRetryTopics.get(record.topic());
log.info("Putting message with key [{}] on retry-topic [{}].", record.key(), retryTopic);
factory.get().send(createRetryRecord(record, retryTopic, System.currentTimeMillis()));
try {
Expand All @@ -44,10 +44,6 @@ public void accept(ConsumerRecord<K, V> record) {
}
}

public static List<String> retryTopicNames(List<String> topics, String groupId) {
return topics.stream().map(topic -> retryTopicName(topic, groupId)).collect(Collectors.toList());
}

ProducerRecord<K, V> createRetryRecord(ConsumerRecord<K, V> oldRecord, String retryTopic, long nowInMillis) {
ProducerRecord<K, V> newRecord = new ProducerRecord<>(retryTopic, oldRecord.key(), oldRecord.value());

Expand Down Expand Up @@ -80,10 +76,4 @@ static Header processCounterHeader(ProducerRecord<?, ?> producerRecord) {

return new RecordHeader(HEADER_KEY_REPROCESS_COUNTER, String.valueOf(reprocessCount + 1).getBytes());
}

static String retryTopicName(String topic, String groupId) {
if (StringUtils.startsWith(topic, "retry")) return topic;
return String.format("%s-%s-%s", "retry", groupId, topic);
}

}
17 changes: 16 additions & 1 deletion src/test/java/no/finn/retriableconsumer/PoolBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,25 @@

import org.junit.Test;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class PoolBuilderTest {

@Test(expected = IllegalStateException.class)
public void test_some(){
public void test_some() {
new ReliablePoolBuilder<>(null).build();
}

@Test
public void topic_name() {
String retryTopic = ReliablePoolBuilder.retryTopicName("sometopic", "mygroup");
assertThat(retryTopic).isEqualToIgnoringCase("retry-mygroup-sometopic");
}

@Test
public void dont_prefix_if_already_prefixed() {
String retryTopic = ReliablePoolBuilder.retryTopicName("retry-sometopic", "mygroup");
assertThat(retryTopic).isEqualToIgnoringCase("retry-sometopic");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

Expand Down Expand Up @@ -134,7 +135,11 @@ public void put_on_fail_queue_if_process_returns_false() {


Producer<String, String> mockProducer = mock(Producer.class);
RetryHandler<String, String> failer = new RetryHandler<>(() -> mockProducer, 100, "testgroupid");
Map<String, String> topicsRetryTopic = new HashMap<String, String>() {{
put("topic", "retry-topic");
put("topic1", "retry-topic1");
}};
RetryHandler<String, String> failer = new RetryHandler<>(() -> mockProducer, 100, topicsRetryTopic);


RestartableKafkaConsumer<String, String> consumer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import org.mockito.ArgumentCaptor;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
Expand All @@ -20,7 +22,7 @@

public class RestartableRetryProducerTest {

@Test
/* @Test
public void topic_name() {
String retryTopic = RetryHandler.retryTopicName("sometopic", "mygroup");
Expand All @@ -34,12 +36,17 @@ public void dont_prefix_if_already_prefixed() {
String retryTopic = RetryHandler.retryTopicName("retry-sometopic", "mygroup");
assertThat(retryTopic).isEqualToIgnoringCase("retry-sometopic");
}
}*/

@Test
public void create_producer_record_with_correct_headers_for_first_time_failing_Record() {
Map<String, String> topicsRetryTopics = new HashMap<String, String>() {{
put("topic", "retry-topic");
}};

RetryHandler<String, String> producer = new RetryHandler<>( ()->null, 500, topicsRetryTopics);


RetryHandler<String, String> producer = new RetryHandler<>( ()->null, 500, "");

ConsumerRecord<String, String> oldRecord = new ConsumerRecord<>("topic", 0, 0, "key", "value");
ProducerRecord retryRecord = producer.createRetryRecord(oldRecord, "retry-topic", 1000);
Expand All @@ -56,8 +63,11 @@ public void create_producer_record_with_correct_headers_for_first_time_failing_R

@Test
public void conserve_timestamp_header_increse_counter_header() {
Map<String, String> topicsRetryTopics = new HashMap<String, String>() {{
put("topic", "retry-topic");
}};

RetryHandler<String, String> producer = new RetryHandler<>( ()->null, 500, "");
RetryHandler<String, String> producer = new RetryHandler<>( ()->null, 500, topicsRetryTopics);

ConsumerRecord<String, String> oldRecord = new ConsumerRecord<>("topic", 0, 0, "key", "value");
oldRecord.headers().add(new RecordHeader(RetryHandler.HEADER_KEY_REPROCESS_COUNTER, "41".getBytes()));
Expand All @@ -79,8 +89,11 @@ public void conserve_timestamp_header_increse_counter_header() {

@Test
public void parse_produced_counter_headers() {
Map<String, String> topicsRetryTopics = new HashMap<String, String>() {{
put("topic", "retry-topic");
}};

RetryHandler<String, String> producer = new RetryHandler<>( ()->null, 500, "");
RetryHandler<String, String> producer = new RetryHandler<>( ()->null, 500, topicsRetryTopics);

ConsumerRecord<String, String> oldRecord = new ConsumerRecord<>("topic", 0, 0, "key", "value");
ProducerRecord retryRecord = producer.createRetryRecord(oldRecord, "retry-topic", 1000);
Expand All @@ -97,8 +110,11 @@ public void parse_produced_counter_headers() {

@Test
public void parse_produced_timestamp_header() {
Map<String, String> topicsRetryTopics = new HashMap<String, String>() {{
put("topic", "retry-topic");
}};

RetryHandler<String, String> producer = new RetryHandler<>( ()->null, 500, "");
RetryHandler<String, String> producer = new RetryHandler<>( ()->null, 500, topicsRetryTopics);

ConsumerRecord<String, String> oldRecord = new ConsumerRecord<>("topic", 0, 0, "key", "value");
ProducerRecord retryRecord = producer.createRetryRecord(oldRecord, "retry-topic", 1000);
Expand All @@ -125,7 +141,11 @@ public void send_to_kafka_when_record_available_on_queue() throws Exception {

ConsumerRecord<String, String> failedRecord = new ConsumerRecord<>("foo", 0, 0, "bar", "baz");

RetryHandler<String, String> retryProducer =new RetryHandler<>(()->kafkaProducerMock,100, "groupid");
Map<String, String> topicsRetryTopics = new HashMap<String, String>() {{
put("foo", "retry-groupid-foo");
}};

RetryHandler<String, String> retryProducer =new RetryHandler<>(()->kafkaProducerMock,100, topicsRetryTopics);

retryProducer.accept(failedRecord);

Expand Down
4 changes: 2 additions & 2 deletions src/test/kotlin/KafkaIntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class KafkaIntegrationTest {
}
val pollFunction = Function<Consumer<String, String>, ConsumerRecords<String, String>> { it.poll(1) }

val pool = ReliableKafkaConsumerPool(3, factory, listOf("foo"), process, pollFunction, 10, 10_000)
val pool = ReliableKafkaConsumerPool(3, factory, mapOf("foo" to "retry-foo"), process, pollFunction, 10, 10_000)

pool.monitor.start()

Expand Down Expand Up @@ -80,7 +80,7 @@ class KafkaIntegrationTest {
}
val poll = Function<Consumer<String, String>, ConsumerRecords<String, String>> { it.poll(1) }

val pool = ReliableKafkaConsumerPool(3, factory, listOf("foo"), process, poll, 10, 10_000_000)
val pool = ReliableKafkaConsumerPool(3, factory, mapOf("foo" to "retry-foo"), process, poll, 10, 10_000_000)

pool.monitor.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ReliableKafkaConsumerPoolTest {
val processingFunction = Function<ConsumerRecord<String, String>, Boolean> { TODO("not in use") }
val pollFunction = Function<Consumer<String, String>, ConsumerRecords<String, String>> { TODO("not in use") }

val pool = ReliableKafkaConsumerPool(1, factory, Collections.singletonList("topic"), processingFunction, pollFunction,100, 100_000)
val pool = ReliableKafkaConsumerPool(1, factory, mapOf("topic" to "retry-topic"), processingFunction, pollFunction,100, 100_000)

assertEquals(2, pool.size())
}
Expand Down

0 comments on commit f843009

Please sign in to comment.