- Add the Kafka dependencies to your
pom.xml
as below:- Fetch the latest at mvnrepository
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
- This project also makes use of slf4j. Fetch the latest at mvnrepository
- Writing a basic producer in Java. See
ProducerDemo.java
for further details.
String bootstrapServers = "localhost:9092";
// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("first_topic", "hello world");
// send the data - asynchronous
producer.send(record);
// flush data
producer.flush();
// flush and close producer
producer.close();
- Send data with a callback function. See
ProducerWithCallback.java
for further details.
for (int i=0; i<10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("first_topic", "hello world " + Integer.toString(i));
// send the data - asynchronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// record was successfully sent
logger.info("Received new metadata. \n" +
"Topic: " + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
logger.error("Error while producing", e);
}
}
});
}
- A producer with keys value pairs. By providing a key we guarantee that the same key goes to the same partition.
See
ProducerDemoKeys.java
for further details.
for (int i=0; i<10; i++) {
String topic = "first_topic";
String value = "hello world " + Integer.toString(i);
String key = "id_" + Integer.toString(i);
logger.info("Key: " + key); // log the key
// By providing a key we guarantee that the same key goes to the same partition
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
// send the data - asynchronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// record was successfully sent
logger.info("Received new metadata. \n" +
"Topic: " + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
logger.error("Error while producing", e);
}
}
}).get(); // Bad practice, we just made the call synchronous.
}
- A simple Kafka consumer.
Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
String bootStrapServers = "localhost:9092";
String groupId = "my-fourth-application";
String topic = "first_topic";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest/latest/none
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic(s)
consumer.subscribe(Collections.singleton(topic));
// consumer.subscribe(Arrays.asList("first_topic", "second_topic")) to subscribe to multiple topics
// poll for new data
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
- Assign a group to a Kafka consumer.
String bootStrapServers = "localhost:9092";
String groupId = "my-fourth-application";
String topic = "first_topic";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- If you're not familiar with Threads, this may be a little off putting. In general you want a threaded solution in production.
public static void main(String[] args) {
new ConsumerDemoWithThread().run();
}
private ConsumerDemoWithThread() {
}
private void run() {
Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());
String bootStrapServers = "localhost:9092";
String groupId = "my-sixth-application";
String topic = "first_topic";
// latch for dealing with concurrency
CountDownLatch latch = new CountDownLatch(1);
// create the consumer runnable
logger.info("Creating the consumer thread");
Runnable myConsumerRunnable = new ConsumerRunnable(latch,
bootStrapServers,
groupId,
topic);
// Start the thread
Thread myThread = new Thread(myConsumerRunnable);
myThread.start();
// add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread( () -> {
logger.info("Caught shutdown hook");
((ConsumerRunnable) myConsumerRunnable).shutdown();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("Application has exited");
}));
try {
latch.await();
} catch (InterruptedException e) {
logger.error("Application got interrupted", e);
} finally {
logger.info("Application is closing");
}
}
public class ConsumerRunnable implements Runnable {
private CountDownLatch latch;
private KafkaConsumer<String, String> consumer;
private Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class.getName());
public ConsumerRunnable(CountDownLatch latch,
String bootStrapServers,
String groupId,
String topic) {
this.latch = latch;
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest/latest/none
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
} catch (WakeupException e) {
logger.info("Received shutdown signal!");
} finally {
consumer.close();
// tell our main code we're done with the consumer
latch.countDown();
}
}
public void shutdown() {
consumer.wakeup(); // special method to interrupt consumer.poll()
}
}
- Use assign and seek carefully. Generally used to replay messages or fetch a specific message.
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// assign and seek are used to replay data (or fetch a specific message)
// assign
TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
long offsetToReadFrom = 15L;
consumer.assign(Arrays.asList(partitionToReadFrom));
// seek
consumer.seek(partitionToReadFrom, offsetToReadFrom);
int numberOfMessagesToRead = 5;
boolean keepOnReading = true;
int numberOfMessagesReadSoFar = 0;
// poll for new data
while (keepOnReading) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
numberOfMessagesReadSoFar += 1;
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
if (numberOfMessagesReadSoFar >= numberOfMessagesToRead) {
keepOnReading = false;
break;
}
}
}
}
- Set producer properties as below:
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
- Enable compression by setting
compression.type
. Experiment with different methods!
// Enable compression, your network will thank you
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // experiment with different methods
- By default, produced messages are sent as soon as they are created. Enable
linger.ms
&batch.size
to control this flow.
// Enable batch sizing
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // wait 20ms before sending messages to they can be batched.
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32KB Batch Size
- See
TwitterProducer.java
for further details
logger.info("Setup");
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
// create a twitter client
Client client = createTwitterClient(msgQueue);
// Attempts to establish a connection.
client.connect();
// create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();
// loop to send tweets to kafka
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null) {
logger.info(msg);
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
}
logger.info("End of application");