Skip to content

Commit

Permalink
Formatting & Fix warnings from CI
Browse files Browse the repository at this point in the history
  • Loading branch information
florissmit10 authored and rtyler committed Nov 1, 2023
1 parent 904faa2 commit 3eac101
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 39 deletions.
82 changes: 50 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use crate::{
};
use delta_helpers::*;
use deltalake::checkpoints::CheckpointError;
use std::ops::Add;
use rdkafka::message::BorrowedMessage;
use std::ops::Add;

/// Type alias for Kafka partition
pub type DataTypePartition = i32;
Expand Down Expand Up @@ -172,7 +172,7 @@ pub enum IngestError {

/// Error returned when a message is received from Kafka that has already been processed.
#[error(
"Partition offset has already been processed - partition: {partition}, offset: {offset}"
"Partition offset has already been processed - partition: {partition}, offset: {offset}"
)]
AlreadyProcessedPartitionOffset {
/// The Kafka partition the message was received from
Expand Down Expand Up @@ -373,7 +373,8 @@ pub async fn start_ingest(
consumer.clone(),
opts,
ingest_metrics.clone(),
).await?;
)
.await?;

// Write seek_offsets if it's supplied and has not been written yet
ingest_processor.write_offsets_to_delta_if_any().await?;
Expand All @@ -398,8 +399,8 @@ pub async fn start_ingest(
rebalance_signal.clone(),
&mut partition_assignment,
&mut ingest_processor,
).await

)
.await
{
match e {
IngestError::RebalanceInterrupt => continue,
Expand Down Expand Up @@ -520,32 +521,43 @@ pub async fn start_ingest(
}
}

fn end_of_partition_reached(message: &BorrowedMessage,
offset_map: &HashMap<DataTypePartition, DataTypeOffset>) -> bool{
fn end_of_partition_reached(
message: &BorrowedMessage,
offset_map: &HashMap<DataTypePartition, DataTypeOffset>,
) -> bool {
let partition = message.partition() as DataTypePartition;
let offset = &(message.offset() as DataTypeOffset);
let max_offset = offset_map.get(&partition).unwrap();
max_offset == offset
}

fn unassign_partition(cancellation_token: Arc<CancellationToken>, consumer: Arc<StreamConsumer<KafkaContext>>, message: &BorrowedMessage) -> Result<(), IngestError> {
fn unassign_partition(
cancellation_token: Arc<CancellationToken>,
consumer: Arc<StreamConsumer<KafkaContext>>,
message: &BorrowedMessage,
) -> Result<(), IngestError> {
let mut tpl = TopicPartitionList::new();
let partition = message.partition();
tpl.add_partition(message.topic(), partition);

// Remove the partition of this message from the assigned partitions
let assignment = consumer.assignment()?
let assignment = consumer
.assignment()?
.elements()
.iter()
.filter(|tp| {
tp.topic() != message.topic() || (tp.topic() == message.topic() && tp.partition() != message.partition())
tp.topic() != message.topic()
|| (tp.topic() == message.topic() && tp.partition() != message.partition())
})
.map(|tp| ((tp.topic().to_string(), tp.partition()), tp.offset()))
.collect::<HashMap<_, _>>();

let new_assignment = TopicPartitionList::from_topic_map(&assignment)?;
consumer.assign(&new_assignment)?;
log::info!("Reached the end of partition {}, removing assignment", partition);
log::info!(
"Reached the end of partition {}, removing assignment",
partition
);

if new_assignment.count() == 0 {
log::info!("Reached the end of partition {}, terminating", partition);
Expand All @@ -555,9 +567,11 @@ fn unassign_partition(cancellation_token: Arc<CancellationToken>, consumer: Arc<
Ok(())
}

fn fetch_latest_offsets(topic: &String, consumer: &Arc<StreamConsumer<KafkaContext>>) -> Result<HashMap<DataTypePartition, DataTypeOffset>, IngestError> {
let metadata = &consumer
.fetch_metadata(Some(topic.as_ref()), Timeout::Never)?;
fn fetch_latest_offsets(
topic: &String,
consumer: &Arc<StreamConsumer<KafkaContext>>,
) -> Result<HashMap<DataTypePartition, DataTypeOffset>, IngestError> {
let metadata = &consumer.fetch_metadata(Some(topic.as_ref()), Timeout::Never)?;

let partition_meta = metadata
.topics()
Expand All @@ -567,7 +581,10 @@ fn fetch_latest_offsets(topic: &String, consumer: &Arc<StreamConsumer<KafkaConte
.first()
.unwrap()
.partitions();
let partitions = partition_meta.iter().map(|p| p.id() as DataTypePartition).collect::<Vec<_>>();
let partitions = partition_meta
.iter()
.map(|p| p.id() as DataTypePartition)
.collect::<Vec<_>>();
let result = get_high_watermark_map(topic.as_str(), consumer.clone(), partitions.into_iter())?;
Ok(result)
}
Expand Down Expand Up @@ -641,10 +658,10 @@ fn should_record_buffer_lag(last_buffer_lag_report: &Option<Instant>) -> bool {
match last_buffer_lag_report {
None => true,
Some(last_buffer_lag_report)
if last_buffer_lag_report.elapsed().as_secs() >= BUFFER_LAG_REPORT_SECONDS =>
{
true
}
if last_buffer_lag_report.elapsed().as_secs() >= BUFFER_LAG_REPORT_SECONDS =>
{
true
}
_ => false,
}
}
Expand Down Expand Up @@ -786,8 +803,8 @@ impl IngestProcessor {
/// Processes a single message received from Kafka.
/// This method deserializes, transforms and writes the message to buffers.
async fn process_message<M>(&mut self, message: M) -> Result<(), IngestError>
where
M: Message + Send + Sync,
where
M: Message + Send + Sync,
{
let partition = message.partition();
let offset = message.offset();
Expand Down Expand Up @@ -850,8 +867,8 @@ impl IngestProcessor {
&mut self,
msg: &M,
) -> Result<Value, MessageDeserializationError>
where
M: Message + Send + Sync,
where
M: Message + Send + Sync,
{
let message_bytes = match msg.payload() {
Some(b) => b,
Expand Down Expand Up @@ -973,8 +990,8 @@ impl IngestProcessor {
},
&self.table.state,
None,
).await

)
.await
{
Ok(v) => {
/*if v != version {
Expand Down Expand Up @@ -1098,7 +1115,7 @@ impl IngestProcessor {

let should = self.value_buffers.len() > 0
&& (self.value_buffers.len() == self.opts.max_messages_per_batch
|| elapsed_millis >= (self.opts.allowed_latency * 1000) as u128);
|| elapsed_millis >= (self.opts.allowed_latency * 1000) as u128);

debug!(
"Should complete record batch - latency test: {} >= {}",
Expand All @@ -1120,7 +1137,7 @@ impl IngestProcessor {

let should = self.delta_writer.buffer_len() > 0
&& (self.delta_writer.buffer_len() >= self.opts.min_bytes_per_file
|| elapsed_secs >= self.opts.allowed_latency);
|| elapsed_secs >= self.opts.allowed_latency);

debug!(
"Should complete file - latency test: {} >= {}",
Expand Down Expand Up @@ -1320,7 +1337,8 @@ async fn dead_letter_queue_from_options(
delta_table_uri: opts.dlq_table_uri.clone(),
dead_letter_transforms: opts.dlq_transforms.clone(),
write_checkpoints: opts.write_checkpoints,
}).await
})
.await
}

/// Creates a vec of partition numbers from a topic partition list.
Expand All @@ -1340,8 +1358,8 @@ fn get_high_watermarks<I>(
consumer: Arc<StreamConsumer<KafkaContext>>,
partitions: I,
) -> Result<Vec<DataTypeOffset>, KafkaError>
where
I: Iterator<Item=DataTypePartition>
where
I: Iterator<Item = DataTypePartition>,
{
get_high_watermark_map(topic, consumer, partitions)
.map(|hashmap| hashmap.into_values().collect::<Vec<_>>())
Expand All @@ -1353,8 +1371,8 @@ fn get_high_watermark_map<I>(
consumer: Arc<StreamConsumer<KafkaContext>>,
partitions: I,
) -> Result<HashMap<DataTypePartition, DataTypeOffset>, KafkaError>
where
I: Iterator<Item=DataTypePartition>
where
I: Iterator<Item = DataTypePartition>,
{
partitions
.map(|partition| {
Expand Down
3 changes: 1 addition & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ async fn main() -> anyhow::Result<()> {
.unwrap()
.to_string();

let end_at_last_offsets = ingest_matches
.contains_id("end");
let end_at_last_offsets = ingest_matches.contains_id("end");

let format = convert_matches_to_message_format(ingest_matches).unwrap();

Expand Down
4 changes: 2 additions & 2 deletions tests/end_at_last_offsets_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn end_at_initial_offsets() {
&topic,
&serde_json::to_value(Msg::new(i)).unwrap(),
)
.await;
.await;
}

let (kdi, _token, rt) = helpers::create_kdi(
Expand Down Expand Up @@ -79,7 +79,7 @@ async fn end_at_initial_offsets() {
&topic,
&serde_json::to_value(Msg::new(i)).unwrap(),
)
.await;
.await;
}

helpers::expect_termination_within(kdi, 10).await;
Expand Down
6 changes: 3 additions & 3 deletions tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,19 +340,19 @@ pub fn wait_until_version_created(table: &str, version: i64) {
wait_until_file_created(FilePath::new(&path));
}

pub async fn expect_termination_within(kdi: JoinHandle<()>, seconds: i64){
pub async fn expect_termination_within(kdi: JoinHandle<()>, seconds: i64) {
let start_time = Local::now();
let timelimit = chrono::Duration::seconds(seconds);

loop {
if kdi.is_finished(){
if kdi.is_finished() {
kdi.await.unwrap();
return;
}
let now = Local::now();
let poll_time = now - start_time;

if poll_time > timelimit{
if poll_time > timelimit {
panic!("KDI did not terminate within timeout",);
}
}
Expand Down

0 comments on commit 3eac101

Please sign in to comment.