Skip to content

Commit

Permalink
update Readme.md add main function example using tokio and async
Browse files Browse the repository at this point in the history
  • Loading branch information
cploutarchou committed Oct 21, 2023
1 parent b029211 commit 2c84820
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tiny_kafka"
version = "1.0.3"
version = "1.0.4"
authors = ["Christos Ploutarchou <cploutarchou@gmail.com>"]
edition = "2021"
description = "A tiny Kafka client library with producer and consumer functionalities."
Expand Down
69 changes: 67 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,83 @@ The Kafka consumer provides functionality to consume messages from a Kafka topic

### Usage

To initialize the consumer:
#### To initialize the consumer:

```rust
let consumer = KafkaConsumer::new("localhost:9092", "my_group", "my_topic");
```

To consume messages:
#### To consume messages:

```rust
if let Some(msg) = consumer.poll().await {
println!("Received: {} -> {}", msg.key, msg.value);
}
```
#### Full Main Function Example with Tokio and Async

Below is a detailed example of how to utilize both the Kafka producer and consumer within an asynchronous context, leveraging Tokio.

```rust
use log::info;
use std::sync::Arc;
use tiny_kafka::consumer::KafkaConsumer;
use tiny_kafka::producer::{KafkaProducer, Message};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

let rt = tokio::runtime::Runtime::new().unwrap();
// Assuming kafka_bootstrap_servers is of type String
let brokers = Arc::new("localhost:9092".to_string());
let topics = Arc::new(vec!["test".to_string()]);

// Consumer task
let brokers_for_task1 = brokers.clone();
let topics_for_task1 = topics.clone();
let task1 = async move {
let consumer = KafkaConsumer::new(
brokers_for_task1.as_str(),
"kafka-to-elastic",
topics_for_task1.get(0).unwrap(),
);
loop {
if let Some(msg) = consumer.poll().await {
info!(
"Consumed message with key: {} and value: {}",
msg.key, msg.value
);
}
}
};
rt.spawn(task1);

// Producer task
let brokers_for_task2 = brokers.clone();
let topics_for_task2 = topics.clone();
let task2 = async move {
let producer = KafkaProducer::new(brokers_for_task2.as_str(), Option::None);

for i in 0..100 {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
let message = Message::new(&key, &value);

producer
.send_message(topics_for_task2.get(0).unwrap(), message)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
};
rt.spawn(task2);

// Wait for a ctrl-c signal
tokio::signal::ctrl_c().await?;
println!("ctrl-c received!");

Ok(())
}

```

### Custom Configurations
Expand Down

0 comments on commit 2c84820

Please sign in to comment.