Skip to content

Commit

Permalink
Kafka: Tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Nov 19, 2023
1 parent 63c0cac commit f0fe169
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions sea-streamer-kafka/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use crate::{
cluster::cluster_uri, impl_into_string, stream_err, BaseOptionKey, KafkaConnectOptions,
KafkaErr, KafkaResult, DEFAULT_TIMEOUT,
};
pub use rdkafka::producer::FutureRecord;
use rdkafka::{
config::ClientConfig,
producer::{DeliveryFuture, Producer as ProducerTrait},
producer::{DeliveryFuture, FutureRecord as RawPayload, Producer as ProducerTrait},
};
pub use rdkafka::{consumer::ConsumerGroupMetadata, TopicPartitionList};
pub use rdkafka::{consumer::ConsumerGroupMetadata, producer::FutureRecord, TopicPartitionList};
use sea_streamer_runtime::spawn_blocking;
use sea_streamer_types::{
export::{async_trait, futures::FutureExt},
Expand Down Expand Up @@ -89,7 +88,7 @@ impl Producer for KafkaProducer {
fn send_to<S: Buffer>(&self, stream: &StreamKey, payload: S) -> KafkaResult<Self::SendFuture> {
let fut = self
.get()
.send_result(FutureRecord::<str, [u8]>::to(stream.name()).payload(payload.as_bytes()))
.send_result(RawPayload::<str, [u8]>::to(stream.name()).payload(payload.as_bytes()))
.map_err(|(err, _raw)| stream_err(err))?;

Ok(SendFuture {
Expand Down

0 comments on commit f0fe169

Please sign in to comment.