diff --git a/tests/coercions_tests.rs b/tests/coercions_tests.rs deleted file mode 100644 index 3a9db75..0000000 --- a/tests/coercions_tests.rs +++ /dev/null @@ -1,73 +0,0 @@ -#[allow(dead_code)] -mod helpers; - -use chrono::DateTime; -use serde_json::{json, Value}; - -#[tokio::test] -#[ignore] -async fn coercions_tests() { - let (topic, table, producer, kdi, token, rt) = helpers::create_and_run_kdi( - "coercions", - json!({ - "id": "integer", - "ts_str": "timestamp", - "ts_micro": "timestamp", - "ts_invalid": "timestamp", - "data_obj": { - "value": "integer", - "time": "timestamp", - }, - "data_str": "string" - }), - vec![], - 1, - None, - ) - .await; - - let now_iso = "2021-11-12T00:12:01.123Z"; - let now_parquet = "2021-11-12 00:12:01 +00:00"; - let now = DateTime::parse_from_rfc3339(now_iso).unwrap(); - let now_micros = now.timestamp_nanos() / 1000; - - let msg_json = json!({ - "id": 1, - "ts_str": now_iso, // in ISO 8601 string format - "ts_micro": now_micros, // in microsecond number format - "ts_invalid": "whatever", // invalid timestamp, should be parsed as null - "data_obj": { - "value": 10, - "time": now_iso - }, - "data_str": { // it's string in delta format but we pass it as json object - "value": 20, - "time": now_iso - } - }); - - helpers::send_json(&producer, &topic, &msg_json).await; - helpers::wait_until_version_created(&table, 1); - - token.cancel(); - kdi.await.unwrap(); - rt.shutdown_background(); - - let data = helpers::read_table_content_as_jsons(&table).await; - assert_eq!(data.len(), 1); - let result = data.first().unwrap().as_object().unwrap(); - let get = |key| result.get(key).unwrap().clone(); - - assert_eq!(get("id"), json!(1)); - assert_eq!(get("ts_str"), json!(now_parquet)); - assert_eq!(get("ts_micro"), json!(now_parquet)); - assert_eq!(get("ts_invalid"), Value::Null); - assert_eq!(get("data_obj"), json!({"value": 10, "time": now_parquet})); - - // using now_iso because data_str is treated as string and hence was not transformed by parquet - assert_ne!(get("data_str"), json!({"value": 20, "time": now_iso})); - let parsed: Value = serde_json::from_str(get("data_str").as_str().unwrap()).unwrap(); - assert_eq!(parsed, json!({"value": 20, "time": now_iso})); - - helpers::cleanup_kdi(&topic, &table).await; -} diff --git a/tests/feed_s3_tests.rs b/tests/feed_s3_tests.rs deleted file mode 100644 index e1c7e4b..0000000 --- a/tests/feed_s3_tests.rs +++ /dev/null @@ -1,238 +0,0 @@ -#[allow(dead_code)] -mod helpers; - -use kafka_delta_ingest::IngestOptions; -use maplit::hashmap; -use rdkafka::producer::FutureProducer; - -use chrono::Utc; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use std::sync::Arc; -use std::time::Duration; -use tokio::runtime::Runtime; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; -use uuid::Uuid; - -#[derive(Clone, Serialize, Deserialize, Debug)] -struct TestMsg { - id: u64, - text: String, - feed: String, - timestamp: String, -} - -const TOTAL_MESSAGES_SENT: usize = 600; -// see seek offsets param, we skip 5 messages in each partition, e,g. 12 x 5 -const TOTAL_MESSAGES_RECEIVED: usize = TOTAL_MESSAGES_SENT - 60; -const FEEDS: usize = 3; - -#[tokio::test(flavor = "multi_thread")] -#[ignore] -async fn feed_load_test() { - helpers::init_logger(); - - let topic = format!("feed_load_{}", uuid::Uuid::new_v4()); - - let table = helpers::create_local_table( - json!({ - "id": "integer", - "text": "string", - "feed": "string", - "timestamp": "timestamp", - "date": "string", - "kafka": { - "offset": "integer", - "timestamp": "timestamp", - "timestamp_type": "integer", - } - }), - vec!["feed"], - &topic, - ); - - helpers::create_topic(&topic, 12).await; - - let producer = Arc::new(helpers::create_producer()); - - // send message in parallel - let f_a = spawn_send_jsons("A", 0, producer.clone(), &topic); - let f_b = spawn_send_jsons("B", 1, producer.clone(), &topic); - let f_c = spawn_send_jsons("C", 2, producer.clone(), &topic); - - let mut workers = Vec::new(); - - workers.push(spawn_worker(1, &topic, &table)); - std::thread::sleep(Duration::from_secs(10)); - workers.push(spawn_worker(2, &topic, &table)); - std::thread::sleep(Duration::from_secs(10)); - workers.push(spawn_worker(3, &topic, &table)); - - let mut feed_a_ids = f_a.await.unwrap(); - let mut feed_b_ids = f_b.await.unwrap(); - let mut feed_c_ids = f_c.await.unwrap(); - - wait_until_all_messages_received(&table).await; - workers.iter().for_each(|w| w.1.cancel()); - - for (kdi, _, rt) in workers { - println!("wait on worker.."); - kdi.await.unwrap(); - rt.shutdown_background(); - } - - println!("verifying results..."); - - let values = helpers::read_table_content_as_jsons(&table).await; - let mut ids: Vec = values - .iter() - .map(|v| v.as_object().unwrap().get("id").unwrap().as_i64().unwrap()) - .collect(); - ids.sort(); - - let id_count = ids.len(); - let expected = (0..TOTAL_MESSAGES_RECEIVED).count(); - - if id_count != expected { - helpers::inspect_table(&table).await; - } - assert_eq!(id_count, expected); - - let mut expected = Vec::new(); - expected.append(&mut feed_a_ids); - expected.append(&mut feed_b_ids); - expected.append(&mut feed_c_ids); - expected.sort(); - - assert_eq!(ids, expected); - - // verify transforms - let m = values.first().unwrap().as_object().unwrap(); - assert!(m.contains_key("date")); - let now = Utc::now().to_rfc3339(); - assert_eq!(m.get("date").unwrap().as_str().unwrap(), &now[..10]); - assert!(m.contains_key("kafka")); - let kafka = m.get("kafka").unwrap().as_object().unwrap(); - assert!(kafka.contains_key("offset")); - assert!(kafka.contains_key("timestamp")); - assert!(kafka.contains_key("timestamp_type")); - - helpers::cleanup_kdi(&topic, &table).await; -} - -fn spawn_worker( - id: usize, - topic: &str, - table: &str, -) -> (JoinHandle<()>, Arc, Runtime) { - let transforms = hashmap! { - "date" => "substr(timestamp, `0`, `10`)", - "kafka.offset" => "kafka.offset", - "kafka.timestamp" => "kafka.timestamp", - "kafka.timestamp_type" => "kafka.timestamp_type", - }; - let transforms = transforms - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(); - - helpers::create_kdi_with( - topic, - table, - Some(format!("WORKER-{}", id)), - IngestOptions { - app_id: "feed".to_string(), - allowed_latency: 3, - max_messages_per_batch: 30, - min_bytes_per_file: 100, - transforms, - seek_offsets: Some(vec![ - (0, 4), - (1, 4), - (2, 4), - (3, 4), - (4, 4), - (5, 4), - (6, 4), - (7, 4), - (8, 4), - (9, 4), - (10, 4), - (11, 4), - ]), - additional_kafka_settings: Some(hashmap! { - "auto.offset.reset".to_string() => "earliest".to_string(), - }), - ..Default::default() - }, - ) -} - -fn spawn_send_jsons( - feed: &str, - feed_n: usize, - producer: Arc, - topic: &str, -) -> JoinHandle> { - let feed = feed.to_string(); - let topic = topic.to_string(); - tokio::spawn(async move { send_jsons(feed.as_str(), feed_n, producer, topic).await }) -} -async fn send_jsons( - feed: &str, - feed_n: usize, - producer: Arc, - topic: String, -) -> Vec { - let feed = feed.to_string(); - let mut sent = Vec::new(); - for id in 0..TOTAL_MESSAGES_SENT { - let topic = topic.clone(); - if id % FEEDS == feed_n { - let m = json!({ - "id": id, - "text": format!("{}-{}-{}",Uuid::new_v4(),Uuid::new_v4(),Uuid::new_v4()), - "feed": feed, - "timestamp": Utc::now().to_rfc3339(), - }); - let (_, o) = helpers::send_kv_json(&producer, &topic, format!("{}", id), &m).await; - if o > 4 { - sent.push(id as i64); - } - } - } - - println!("Sent {} messages for feed {}", sent.len(), feed); - sent -} - -async fn wait_until_all_messages_received(table: &str) { - let mut waited_ms = 0; - loop { - let values = helpers::read_table_content_as_jsons(table) - .await - .iter() // just to ensure it's expected value - .map(|v| serde_json::from_value::(v.clone())) - .count(); - if values >= TOTAL_MESSAGES_RECEIVED { - println!( - "Received all {}/{} messages", - values, TOTAL_MESSAGES_RECEIVED - ); - return; - } - tokio::time::sleep(Duration::from_millis(1000)).await; - waited_ms += 1000; - if waited_ms > 300000 { - panic!( - "Waited more than 300s to received {} messages, but got only {} for {}", - TOTAL_MESSAGES_RECEIVED, values, table - ); - } - println!( - "Waiting for messages to be consumed {}/{}", - values, TOTAL_MESSAGES_RECEIVED - ); - } -} diff --git a/tests/playground.rs b/tests/playground.rs deleted file mode 100644 index 0f3d3ae..0000000 --- a/tests/playground.rs +++ /dev/null @@ -1,112 +0,0 @@ -use kafka_delta_ingest::IngestOptions; -use log::info; -use rdkafka::producer::FutureProducer; -use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; -use uuid::Uuid; - -#[allow(dead_code)] -mod helpers; - -// This is not an actual test but rather a playground to test various things and features -// with a minimal effort. Just edit the `Playground::run_custom_checks` function below -// however you need and execute the `RUST_LOG=INFO cargo test playground`. -// The table content is in `TABLE_PATH`, and by default it's cleared before each run. -#[tokio::test] -async fn playground() { - Playground::run().await; -} - -const TABLE_PATH: &str = "./tests/data/gen/playground"; -const MAX_MESSAGES_PER_BATCH: usize = 1; - -#[derive(Debug, Serialize, Deserialize, Clone)] -struct Message { - id: u32, - date: Option, -} - -impl Message { - pub fn new(id: u32, date: Option) -> Self { - Self { id, date } - } -} - -struct Playground { - producer: FutureProducer, - topic: String, -} - -impl Playground { - async fn run_custom_checks(&self) { - // Run custom checks here! - - for id in 1..25 { - self.send_message(Message::new(id, Some("p".to_string()))) - .await; - } - - helpers::wait_until_version_created(TABLE_PATH, 24); - } - - async fn send_message(&self, msg: Message) { - self.send_json(&serde_json::to_value(msg).unwrap()).await; - } - - async fn send_json(&self, value: &Value) { - helpers::send_json(&self.producer, &self.topic, value).await; - } - - async fn run() { - helpers::init_logger(); - let _ = std::fs::remove_dir_all(TABLE_PATH); - helpers::create_local_table_in( - json!({ - "id": "integer", - "date": "string", - }), - vec!["date"], - TABLE_PATH, - ); - - let topic = format!("playground_{}", Uuid::new_v4()); - helpers::create_topic(&topic, 1).await; - - let ingest_options = IngestOptions { - app_id: "playground".to_string(), - allowed_latency: 5, - max_messages_per_batch: MAX_MESSAGES_PER_BATCH, - min_bytes_per_file: 20, - ..Default::default() - }; - - let (kdi, token, rt) = helpers::create_kdi(&topic, TABLE_PATH, ingest_options); - let producer = helpers::create_producer(); - let playground = Playground { - producer, - topic: topic.clone(), - }; - - playground.run_custom_checks().await; - - token.cancel(); - playground.send_json(&Value::Null).await; - kdi.await.unwrap(); - rt.shutdown_background(); - - info!("The table {} content:", TABLE_PATH); - let content = helpers::read_table_content_as_jsons(TABLE_PATH).await; - let len = content.len(); - let mut bytes = 0; - for json in content { - bytes += json.to_string().as_bytes().len(); - info!("{}", json); - } - let avg = bytes as f64 / len as f64; - info!( - "Total {} records and {} bytes. Average message size: {:.2} bytes", - len, bytes, avg - ); - helpers::delete_topic(&topic).await; - } -}