diff --git a/Cargo.toml b/Cargo.toml index 7883e1f..1fc420d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tiny_kafka" -version = "1.0.3" +version = "1.0.4" authors = ["Christos Ploutarchou "] edition = "2021" description = "A tiny Kafka client library with producer and consumer functionalities." diff --git a/Readme.md b/Readme.md index a1048dc..4672126 100644 --- a/Readme.md +++ b/Readme.md @@ -59,18 +59,83 @@ The Kafka consumer provides functionality to consume messages from a Kafka topic ### Usage -To initialize the consumer: +#### To initialize the consumer: ```rust let consumer = KafkaConsumer::new("localhost:9092", "my_group", "my_topic"); ``` -To consume messages: +#### To consume messages: ```rust if let Some(msg) = consumer.poll().await { println!("Received: {} -> {}", msg.key, msg.value); } +``` +#### Full Main Function Example with Tokio and Async + +Below is a detailed example of how to utilize both the Kafka producer and consumer within an asynchronous context, leveraging Tokio. + +```rust +use log::info; +use std::sync::Arc; +use tiny_kafka::consumer::KafkaConsumer; +use tiny_kafka::producer::{KafkaProducer, Message}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + + let rt = tokio::runtime::Runtime::new().unwrap(); + // Assuming kafka_bootstrap_servers is of type String + let brokers = Arc::new("localhost:9092".to_string()); + let topics = Arc::new(vec!["test".to_string()]); + + // Consumer task + let brokers_for_task1 = brokers.clone(); + let topics_for_task1 = topics.clone(); + let task1 = async move { + let consumer = KafkaConsumer::new( + brokers_for_task1.as_str(), + "kafka-to-elastic", + topics_for_task1.get(0).unwrap(), + ); + loop { + if let Some(msg) = consumer.poll().await { + info!( + "Consumed message with key: {} and value: {}", + msg.key, msg.value + ); + } + } + }; + rt.spawn(task1); + + // Producer task + let brokers_for_task2 = brokers.clone(); + let topics_for_task2 = topics.clone(); + let task2 = async move { + let producer = KafkaProducer::new(brokers_for_task2.as_str(), Option::None); + + for i in 0..100 { + let key = format!("test_key_{}", i); + let value = format!("test_value_{}", i); + let message = Message::new(&key, &value); + + producer + .send_message(topics_for_task2.get(0).unwrap(), message) + .await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + }; + rt.spawn(task2); + + // Wait for a ctrl-c signal + tokio::signal::ctrl_c().await?; + println!("ctrl-c received!"); + + Ok(()) +} + ``` ### Custom Configurations