From 5bfd65e2503d72b4945e964a2c0c59700727b104 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 12 Dec 2024 19:13:57 +0000 Subject: [PATCH] chore: upgrade kafka-delta-ingest to delta-rs 0.22.3 --- Cargo.toml | 8 ++++---- src/writer.rs | 18 +++++++++++++----- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 50f2d23..475e3fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kafka-delta-ingest" -version = "0.3.0" +version = "0.4.0" authors = ["R. Tyler Croy ", "Christian Williams "] edition = "2018" @@ -34,9 +34,9 @@ uuid = { version = "0.8", features = ["serde", "v4"] } url = "2.3" # datafusion feature is required for writer version 2 -deltalake-core = { version = "0.21.0", features = ["json", "datafusion"]} -deltalake-aws = { version = "0.4.0", optional = true } -deltalake-azure = { version = "0.4.0", optional = true } +deltalake-core = { version = "0.22.3", features = ["json", "datafusion"]} +deltalake-aws = { version = "0.5.0", optional = true } +deltalake-azure = { version = "0.5.0", optional = true } # s3 feature enabled, helps for locking interactions with DLQ dynamodb_lock = { version = "0.6.0", optional = true } diff --git a/src/writer.rs b/src/writer.rs index 56b1dfe..d83a07a 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -856,7 +856,7 @@ fn min_and_max_from_parquet_statistics( let stats_with_min_max: Vec<&Statistics> = statistics .iter() - .filter(|s| s.has_min_max_set()) + .filter(|s| s.min_bytes_opt().is_some() && s.max_bytes_opt().is_some()) .copied() .collect(); @@ -892,13 +892,21 @@ fn min_and_max_from_parquet_statistics( let min_array = arrow_array_from_bytes( data_type.clone(), arrow_buffer_capacity, - stats_with_min_max.iter().map(|s| s.min_bytes()).collect(), + // The stats should always exist in stats_with_min_max but defaulting to zero to avoid a + // panic + stats_with_min_max + .iter() + .map(|s| s.min_bytes_opt().unwrap_or(&[0])) + .collect(), )?; let max_array = arrow_array_from_bytes( data_type.clone(), arrow_buffer_capacity, - stats_with_min_max.iter().map(|s| s.max_bytes()).collect(), + stats_with_min_max + .iter() + .map(|s| s.max_bytes_opt().unwrap_or(&[0])) + .collect(), )?; match data_type { @@ -990,7 +998,7 @@ fn min_max_strings_from_stats( ) -> (Option, Option) { let min_string_candidates = stats_with_min_max .iter() - .filter_map(|s| std::str::from_utf8(s.min_bytes()).ok()); + .filter_map(|s| std::str::from_utf8(s.min_bytes_opt().unwrap_or(&[0])).ok()); let min_value = min_string_candidates .min() @@ -998,7 +1006,7 @@ fn min_max_strings_from_stats( let max_string_candidates = stats_with_min_max .iter() - .filter_map(|s| std::str::from_utf8(s.max_bytes()).ok()); + .filter_map(|s| std::str::from_utf8(s.max_bytes_opt().unwrap_or(&[0])).ok()); let max_value = max_string_candidates .max()