Skip to content

Commit

Permalink
Fixed datetime error reporting (#35)
Browse files Browse the repository at this point in the history
Fixed datetime error reporting (#35)
  • Loading branch information
gr211 authored Jun 1, 2023
1 parent d601ec6 commit e94c237
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 85 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 16 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@ authors = ["Romain Gallet <rgallet@grumlimited.co.uk>"]
edition = "2021"

[dependencies]
anyhow = "1.0.71"
async-trait = "0.1.68"
aws-config = { version = "0.55.3" }
aws-sdk-kinesis = { version = "0.28.0" }
chrono = "0.4.25"
clap = { version = "4.3.0", features = ["derive"] }
colored = "2.0.0"
config = "0.13.3"
ctrlc-async = "3.2.2"
env_logger = "0.10.0"
humantime = "2.1.0"
log = "0.4.18"
log4rs = "1.2.0"
nix = "0.26.2"
tokio = { version = "1.28.2", features = ["rt-multi-thread", "macros"] }
anyhow = "1.0"
async-trait = "0.1"
aws-config = { version = "0.55" }
aws-sdk-kinesis = { version = "0.28" }
chrono = "0.4"
clap = { version = "4.3", features = ["derive"] }
colored = "2.0"
config = "0.13"
ctrlc-async = "3.2"
env_logger = "0.10"
humantime = "2.1"
log = "0.4"
log4rs = "1.2"
nix = "0.26"
thiserror = "1.0"
tokio = { version = "1.28", features = ["rt-multi-thread", "macros"] }

[features]
default = ["clap/cargo", "clap/derive", "config/json"]
33 changes: 18 additions & 15 deletions src/cli_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use aws_sdk_kinesis::meta::PKG_VERSION;
use chrono::{DateTime, TimeZone, Utc};
use clap::Parser;
use log::info;
use std::io;

#[derive(Debug, Parser)]
#[command(
Expand Down Expand Up @@ -76,7 +75,7 @@ pub(crate) fn selected_shards(
shards: Vec<String>,
stream_name: &str,
shard_ids: &Option<Vec<String>>,
) -> io::Result<Vec<String>> {
) -> Result<Vec<String>> {
let filtered = match shard_ids {
Some(shard_ids) => shards
.into_iter()
Expand All @@ -86,9 +85,10 @@ pub(crate) fn selected_shards(
};

if filtered.is_empty() {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("No shards found for stream {}", stream_name),
Err(anyhow!(
"No shards found for stream {} (filtered: {})",
stream_name,
shard_ids.is_some()
))
} else {
Ok(filtered)
Expand Down Expand Up @@ -129,24 +129,27 @@ pub(crate) fn print_runtime(opt: &Opt, selected_shards: &[String]) {
pub fn validate_time_boundaries(
from_datetime: &Option<DateTime<Utc>>,
to_datetime: &Option<DateTime<Utc>>,
) -> io::Result<()> {
) -> Result<()> {
from_datetime
.zip(to_datetime.as_ref())
.iter()
.try_for_each(|(from, to)| {
if std::cmp::max(from, to) == from {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"from_datetime must be before to_datetime",
))
Err(anyhow!("{} must be before {}", from, to))
} else {
Ok(())
}
})
}

pub fn parse_date(from: Option<&str>) -> Option<DateTime<Utc>> {
from.map(|f| chrono::Utc.datetime_from_str(f, "%+").unwrap())
pub fn parse_date(datetime: Option<&str>) -> Result<Option<DateTime<Utc>>> {
datetime
.map(|dt| {
chrono::Utc
.datetime_from_str(dt, "%+")
.map_err(|_| anyhow!("Could not parse date [{}]", dt))
})
.map_or(Ok(None), |r| r.map(Some))
}

pub fn reset_signal_pipe_handler() -> Result<()> {
Expand All @@ -172,7 +175,7 @@ mod tests {
#[test]
fn parse_date_test_ok() {
let date = "2023-05-04T20:57:12Z";
let result = parse_date(Some(date)).unwrap();
let result = parse_date(Some(date)).unwrap().unwrap();
let result = result.to_rfc3339();
assert_eq!(result, "2023-05-04T20:57:12+00:00");
}
Expand All @@ -181,7 +184,7 @@ mod tests {
#[should_panic]
fn parse_date_test_fail() {
let invalid_date = "xxx";
parse_date(Some(invalid_date));
let _ = parse_date(Some(invalid_date)).unwrap();
}

#[test]
Expand Down
14 changes: 5 additions & 9 deletions src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ where
cloned_self
.get_config()
.tx_records
.send(Err(PanicError {
message: format!("{:?}", e),
}))
.send(Err(ProcessError::PanicError(format!("{:?}", e))))
.await
.expect("Could not send error to tx_records");
}
Expand All @@ -96,9 +94,9 @@ where
cloned_self
.get_config()
.tx_records
.send(Err(PanicError {
message: "ShardIterator is None".to_string(),
}))
.send(Err(ProcessError::PanicError(
"ShardIterator is None".to_string(),
)))
.await
.expect("");
}
Expand Down Expand Up @@ -141,9 +139,7 @@ where
Err(e) => {
self.get_config()
.tx_records
.send(Err(PanicError {
message: e.to_string(),
}))
.send(Err(ProcessError::PanicError(e.to_string())))
.await
.expect("Could not send error to tx_records");
}
Expand Down
6 changes: 3 additions & 3 deletions src/kinesis/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use tokio::sync::Semaphore;
use crate::iterator::at_sequence;
use crate::iterator::latest;
use crate::kinesis::models::{
PanicError, ShardProcessor, ShardProcessorADT, ShardProcessorAtTimestamp, ShardProcessorConfig,
ShardProcessorLatest,
ProcessError, ShardProcessor, ShardProcessorADT, ShardProcessorAtTimestamp,
ShardProcessorConfig, ShardProcessorLatest,
};
use crate::kinesis::ticker::TickerUpdate;
use crate::kinesis::{IteratorProvider, ShardIteratorProgress};
Expand All @@ -25,7 +25,7 @@ pub fn new(
from_datetime: Option<chrono::DateTime<Utc>>,
to_datetime: Option<chrono::DateTime<Utc>>,
semaphore: Arc<Semaphore>,
tx_records: Sender<Result<ShardProcessorADT, PanicError>>,
tx_records: Sender<Result<ShardProcessorADT, ProcessError>>,
tx_ticker_updates: Sender<TickerUpdate>,
) -> Box<dyn ShardProcessor<AwsKinesisClient> + Send + Sync> {
debug!("Creating ShardProcessor with shard {}", shard_id);
Expand Down
11 changes: 7 additions & 4 deletions src/kinesis/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use aws_sdk_kinesis::primitives::DateTime;
use chrono::Utc;
use std::fmt::Debug;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tokio::sync::Semaphore;

Expand All @@ -27,10 +28,12 @@ pub enum ShardProcessorADT {
Progress(Vec<RecordResult>),
}

#[derive(Debug, Clone)]
pub struct PanicError {
pub message: String,
#[derive(Error, Debug, Clone)]
pub enum ProcessError {
#[error("The stream panicked: {0}")]
PanicError(String),
}

#[derive(Debug, Clone, PartialEq)]
pub struct RecordResult {
pub shard_id: String,
Expand All @@ -46,7 +49,7 @@ pub struct ShardProcessorConfig<K: KinesisClient> {
pub shard_id: String,
pub to_datetime: Option<chrono::DateTime<Utc>>,
pub semaphore: Arc<Semaphore>,
pub tx_records: Sender<Result<ShardProcessorADT, PanicError>>,
pub tx_records: Sender<Result<ShardProcessorADT, ProcessError>>,
pub tx_ticker_updates: Sender<TickerUpdate>,
}

Expand Down
14 changes: 7 additions & 7 deletions src/kinesis/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::aws::client::KinesisClient;
use crate::kinesis::models::{
PanicError, RecordResult, ShardIteratorProgress, ShardProcessor, ShardProcessorADT,
ProcessError, RecordResult, ShardIteratorProgress, ShardProcessor, ShardProcessorADT,
ShardProcessorAtTimestamp, ShardProcessorConfig, ShardProcessorLatest,
};
use crate::kinesis::ticker::TickerUpdate;
Expand All @@ -23,7 +23,7 @@ use tokio::time::sleep;

#[tokio::test]
async fn seed_shards_test() {
let (tx_records, _) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(10);
let (tx_records, _) = mpsc::channel::<Result<ShardProcessorADT, ProcessError>>(10);
let (tx_ticker_updates, _) = mpsc::channel::<TickerUpdate>(10);

let (tx_shard_iterator_progress, mut rx_shard_iterator_progress) =
Expand Down Expand Up @@ -62,7 +62,7 @@ async fn seed_shards_test() {
#[tokio::test]
#[should_panic]
async fn seed_shards_test_timestamp_in_future() {
let (tx_records, _) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(10);
let (tx_records, _) = mpsc::channel::<Result<ShardProcessorADT, ProcessError>>(10);
let (tx_ticker_updates, _) = mpsc::channel::<TickerUpdate>(10);

let (tx_shard_iterator_progress, _) = mpsc::channel::<ShardIteratorProgress>(1);
Expand All @@ -89,7 +89,7 @@ async fn seed_shards_test_timestamp_in_future() {

#[tokio::test]
async fn produced_record_is_processed() {
let (tx_records, mut rx_records) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(10);
let (tx_records, mut rx_records) = mpsc::channel::<Result<ShardProcessorADT, ProcessError>>(10);
let (tx_ticker_updates, mut rx_ticker_updates) = mpsc::channel::<TickerUpdate>(10);

let client = TestKinesisClient {
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn produced_record_is_processed() {

#[tokio::test]
async fn beyond_to_timestamp_is_received() {
let (tx_records, mut rx_records) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(10);
let (tx_records, mut rx_records) = mpsc::channel::<Result<ShardProcessorADT, ProcessError>>(10);
let (tx_ticker_updates, mut rx_ticker_updates) = mpsc::channel::<TickerUpdate>(10);

let client = TestKinesisClient {
Expand Down Expand Up @@ -187,7 +187,7 @@ async fn beyond_to_timestamp_is_received() {

#[tokio::test]
async fn has_records_beyond_end_ts_when_has_end_ts() {
let (tx_records, _) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(10);
let (tx_records, _) = mpsc::channel::<Result<ShardProcessorADT, ProcessError>>(10);
let (tx_ticker_updates, _) = mpsc::channel::<TickerUpdate>(10);

let client = TestKinesisClient {
Expand Down Expand Up @@ -246,7 +246,7 @@ async fn has_records_beyond_end_ts_when_has_end_ts() {

#[tokio::test]
async fn has_records_beyond_end_ts_when_no_end_ts() {
let (tx_records, _) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(10);
let (tx_records, _) = mpsc::channel::<Result<ShardProcessorADT, ProcessError>>(10);
let (tx_ticker_updates, _) = mpsc::channel::<TickerUpdate>(10);

let client = TestKinesisClient {
Expand Down
2 changes: 1 addition & 1 deletion src/kinesis/ticker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Ticker {
info!("------------------------------")
}
}
sleep(delay).await;
sleep(delay).await
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ async fn main() -> Result<()> {

let opt = Opt::parse();

let from_datetime = parse_date(opt.from_datetime.as_deref());
let to_datetime = parse_date(opt.to_datetime.as_deref());
let from_datetime = parse_date(opt.from_datetime.as_deref())?;
let to_datetime = parse_date(opt.to_datetime.as_deref())?;

validate_time_boundaries(&from_datetime, &to_datetime)?;

let client = create_client(opt.region.clone(), opt.endpoint_url.clone()).await;

let (tx_records, rx_records) = mpsc::channel::<Result<ShardProcessorADT, PanicError>>(1000);
let (tx_records, rx_records) = mpsc::channel::<Result<ShardProcessorADT, ProcessError>>(1000);

let shards = get_shards(&client, &opt.stream_name).await?;

Expand Down
Loading

0 comments on commit e94c237

Please sign in to comment.