Skip to content

Commit

Permalink
chore: upgrade kafka-delta-ingest to delta-rs 0.23.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Jan 2, 2025
1 parent 9fe6546 commit 6460e1e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kafka-delta-ingest"
version = "0.3.0"
version = "0.4.0"
authors = ["R. Tyler Croy <rtyler@brokenco.de>", "Christian Williams <christianw@scribd.com>"]
edition = "2018"

Expand Down Expand Up @@ -34,9 +34,9 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
url = "2.3"

# datafusion feature is required for writer version 2
deltalake-core = { version = "0.21.0", features = ["json", "datafusion"]}
deltalake-aws = { version = "0.4.0", optional = true }
deltalake-azure = { version = "0.4.0", optional = true }
deltalake-core = { version = "0.23.0", features = ["json", "datafusion"]}
deltalake-aws = { version = "0.6.0", optional = true }
deltalake-azure = { version = "0.6.0", optional = true }

# s3 feature enabled, helps for locking interactions with DLQ
dynamodb_lock = { version = "0.6.0", optional = true }
Expand Down
18 changes: 13 additions & 5 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ fn min_and_max_from_parquet_statistics(

let stats_with_min_max: Vec<&Statistics> = statistics
.iter()
.filter(|s| s.has_min_max_set())
.filter(|s| s.min_bytes_opt().is_some() && s.max_bytes_opt().is_some())
.copied()
.collect();

Expand Down Expand Up @@ -892,13 +892,21 @@ fn min_and_max_from_parquet_statistics(
let min_array = arrow_array_from_bytes(
data_type.clone(),
arrow_buffer_capacity,
stats_with_min_max.iter().map(|s| s.min_bytes()).collect(),
// The stats should always exist in stats_with_min_max but defaulting to zero to avoid a
// panic
stats_with_min_max
.iter()
.map(|s| s.min_bytes_opt().unwrap_or(&[0]))
.collect(),
)?;

let max_array = arrow_array_from_bytes(
data_type.clone(),
arrow_buffer_capacity,
stats_with_min_max.iter().map(|s| s.max_bytes()).collect(),
stats_with_min_max
.iter()
.map(|s| s.max_bytes_opt().unwrap_or(&[0]))
.collect(),
)?;

match data_type {
Expand Down Expand Up @@ -990,15 +998,15 @@ fn min_max_strings_from_stats(
) -> (Option<Value>, Option<Value>) {
let min_string_candidates = stats_with_min_max
.iter()
.filter_map(|s| std::str::from_utf8(s.min_bytes()).ok());
.filter_map(|s| std::str::from_utf8(s.min_bytes_opt().unwrap_or(&[0])).ok());

let min_value = min_string_candidates
.min()
.map(|s| Value::String(s.to_string()));

let max_string_candidates = stats_with_min_max
.iter()
.filter_map(|s| std::str::from_utf8(s.max_bytes()).ok());
.filter_map(|s| std::str::from_utf8(s.max_bytes_opt().unwrap_or(&[0])).ok());

let max_value = max_string_candidates
.max()
Expand Down

0 comments on commit 6460e1e

Please sign in to comment.