diff --git a/Cargo.lock b/Cargo.lock index eff1a76..2a23bfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -247,7 +247,7 @@ dependencies = [ "arrow-data", "arrow-schema", "arrow-select", - "base64 0.21.5", + "base64 0.21.6", "chrono", "half", "lexical-core", @@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6218987c374650fdad0b476bfc675729762c28dfb35f58608a38a2b1ea337dd" dependencies = [ "async-trait", - "base64 0.21.5", + "base64 0.21.6", "bytes", "dyn-clone", "futures", @@ -556,9 +556,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "c79fed4cdb43e993fcdadc7e58a09fd0e3e649c4436fa11da71c9f1f3ee7feb9" [[package]] name = "bitflags" @@ -674,18 +674,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.13" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642" +checksum = "33e92c5c1a78c62968ec57dbc2440366a2d6e5a23faf829970ff1585dc6b18e2" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.4.12" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" +checksum = "f4323769dc8a61e2c39ad7dc26f6f2800524691a44d74fe3d1071a5c24db6370" dependencies = [ "anstream", "anstyle", @@ -720,7 +720,7 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" dependencies = [ - "crossbeam-utils 0.8.18", + "crossbeam-utils 0.8.19", ] [[package]] @@ -800,12 +800,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.18" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" -dependencies = [ - "cfg-if 1.0.0", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crunchy" @@ -889,12 +886,12 @@ dependencies = [ [[package]] name = "deltalake-aws" version = "0.1.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" +source = "git+https://github.com/delta-io/delta-rs?branch=main#f7c303b74218c202ef683f727701a67da5aaaca5" dependencies = [ "async-trait", "backoff", "bytes", - "deltalake-core", + "deltalake-core 0.17.0 (git+https://github.com/delta-io/delta-rs?branch=main)", "futures", "lazy_static", "maplit", @@ -914,25 +911,73 @@ dependencies = [ [[package]] name = "deltalake-azure" version = "0.1.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" +source = "git+https://github.com/delta-io/delta-rs?branch=main#f7c303b74218c202ef683f727701a67da5aaaca5" dependencies = [ "async-trait", "bytes", - "deltalake-core", + "deltalake-core 0.17.0 (git+https://github.com/delta-io/delta-rs?branch=main)", + "futures", + "lazy_static", + "object_store", + "regex", + "thiserror", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "deltalake-core" +version = "0.17.0" +source = "git+https://github.com/rtyler/delta-rs?branch=createdTime_is_optional#dd42f23b8319af3f0c2db416d521a3bf96cadf1e" +dependencies = [ + "arrow", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "async-trait", + "bytes", + "cfg-if 1.0.0", + "chrono", + "dashmap", + "either", + "errno", + "fix-hidden-lifetime-bug", "futures", + "itertools 0.12.0", "lazy_static", + "libc", + "maplit", + "num-bigint", + "num-traits", + "num_cpus", "object_store", + "once_cell", + "parking_lot 0.12.1", + "parquet", + "percent-encoding", + "rand 0.8.5", "regex", + "roaring", + "serde", + "serde_json", "thiserror", "tokio", "tracing", "url", + "uuid 1.6.1", + "z85", ] [[package]] name = "deltalake-core" version = "0.17.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" +source = "git+https://github.com/delta-io/delta-rs?branch=main#f7c303b74218c202ef683f727701a67da5aaaca5" dependencies = [ "arrow", "arrow-arith", @@ -1393,7 +1438,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.10", + "tokio-util", "tracing", ] @@ -1769,7 +1814,7 @@ dependencies = [ "clap", "deltalake-aws", "deltalake-azure", - "deltalake-core", + "deltalake-core 0.17.0 (git+https://github.com/rtyler/delta-rs?branch=createdTime_is_optional)", "dipstick", "dynamodb_lock", "env_logger", @@ -1780,7 +1825,6 @@ dependencies = [ "maplit", "rdkafka", "rusoto_core", - "rusoto_credential", "rusoto_s3", "schema_registry_converter", "sentry", @@ -1792,7 +1836,7 @@ dependencies = [ "thiserror", "time 0.3.31", "tokio", - "tokio-util 0.6.10", + "tokio-util", "url", "utime", "uuid 1.6.1", @@ -1913,9 +1957,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.12" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97137b25e321a73eef1418d1d5d2eda4d77e12813f8e6dead84bc52c5870a7b" +checksum = "5f526fdd09d99e19742883e43de41e1aa9e36db0c7ab7f935165d611c5cccc66" dependencies = [ "cc", "libc", @@ -2218,7 +2262,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050" dependencies = [ "async-trait", - "base64 0.21.5", + "base64 0.21.6", "bytes", "chrono", "futures", @@ -2382,7 +2426,7 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "base64 0.21.5", + "base64 0.21.6", "brotli", "bytes", "chrono", @@ -2524,9 +2568,9 @@ dependencies = [ [[package]] name = "psl" -version = "2.1.13" +version = "2.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe5b9b0f029db58ebf478fa2a45a8c6f2772c979d878c2c495e2eb2f217f41bc" +checksum = "383703acfc34f7a00724846c14dc5ea4407c59e5aedcbbb18a1c0c1a23fe5013" dependencies = [ "psl-types", ] @@ -2734,7 +2778,7 @@ version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", "bytes", "encoding_rs", "futures-core", @@ -2762,7 +2806,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", - "tokio-util 0.7.10", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -3007,7 +3051,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", ] [[package]] @@ -3417,9 +3461,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "strsim" -version = "0.10.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccbca6f34534eb78dbee83f6b2c9442fea7113f43d9e80ea320f0972ae5dc08d" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "strum" @@ -3680,20 +3724,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.10" diff --git a/Cargo.toml b/Cargo.toml index 02e3a7d..3b3adb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,19 +24,17 @@ serde_json = "1" strum_macros = "0.20" thiserror = "1" tokio = { version = "1", features = ["full"] } -tokio-util = "0.6.3" +tokio-util = "0.7.10" uuid = { version = "1.0", features = ["serde", "v4"] } url = "2.3" #deltalake = { version = "0.16.5", features = ["arrow", "json", "parquet"], optional = true } -deltalake-core = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["json"]} +deltalake-core = { git = "https://github.com/rtyler/delta-rs", branch = "createdTime_is_optional", features = ["json"]} deltalake-aws = { git = "https://github.com/delta-io/delta-rs", branch = "main", optional = true } deltalake-azure = { git = "https://github.com/delta-io/delta-rs", branch = "main", optional = true } # s3 feature enabled dynamodb_lock = { version = "0.6.0", optional = true } -rusoto_core = { version = "0.47", default-features = false, features = ["rustls"], optional = true } -rusoto_credential = { version = "0.47", optional = true } # sentry sentry = { version = "0.23.0", optional = true } @@ -68,8 +66,6 @@ azure = [ s3 = [ "deltalake-aws", "dynamodb_lock", - "rusoto_core", - "rusoto_credential", ] [dev-dependencies] @@ -77,6 +73,7 @@ utime = "0.3" serial_test = "*" tempfile = "3" time = "0.3.20" +rusoto_core = { version = "0.47", default-features = false, features = ["rustls"]} rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"]} [profile.release] diff --git a/src/dead_letters.rs b/src/dead_letters.rs index a4a1c7c..4d25fec 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -255,7 +255,7 @@ impl DeltaSinkDeadLetterQueue { dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE) .unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()), }; - #[cfg(all(feature = "azure", not(feature="s3")))] + #[cfg(all(feature = "azure", not(feature = "s3")))] let opts = HashMap::default(); let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?; diff --git a/src/lib.rs b/src/lib.rs index 2af55a6..6c0b099 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ #![deny(warnings)] #![deny(missing_docs)] +#![allow(unused)] #[macro_use] extern crate lazy_static; diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 136b80b..171505d 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use serde_json::Value; use log::*; +use serde_json::Value; use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; @@ -24,6 +24,13 @@ pub struct DeserializedMessage { } impl DeserializedMessage { + fn new(message: Value) -> Self { + Self { + message, + ..Default::default() + } + } + pub fn schema(&self) -> &Option { &self.schema } @@ -41,9 +48,7 @@ impl DeserializedMessage { /// Allow for `.into()` on [Value] for ease of use impl From for DeserializedMessage { fn from(message: Value) -> Self { - // XXX: This seems wasteful, this function should go away, and the deserializers should - // infer straight from the buffer stream - let iter = vec![message.clone()].into_iter().map(Ok); + let iter = std::iter::once(&message).map(Ok); let schema = match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) { Ok(schema) => Some(schema), @@ -169,7 +174,10 @@ impl MessageDeserializer for DefaultDeserializer { } }; - Ok(value.into()) + match self.can_evolve_schema() { + true => Ok(value.into()), + false => Ok(DeserializedMessage::new(value)), + } } } @@ -183,8 +191,20 @@ mod default_tests { } #[tokio::test] - async fn deserialize_with_schema() { + async fn deserializer_default_without_evolution() { let mut deser = DefaultDeserializer::default(); + let dm = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .unwrap(); + assert_eq!(true, dm.schema().is_none()); + } + + #[tokio::test] + async fn deserialize_with_schema() { + let mut deser = DefaultDeserializer { + schema_evolution: true, + }; let message = deser .deserialize(r#"{"hello" : "world"}"#.as_bytes()) .await