From 88bdf52b7267b140ceb77281050fee780b7f98c0 Mon Sep 17 00:00:00 2001 From: Romain Gallet Date: Tue, 14 May 2024 00:31:46 +0200 Subject: [PATCH] Removed some unneeded clone()s (#64) --- README.md | 3 +-- src/cli_helpers.rs | 8 +------- src/kinesis.rs | 16 ++-------------- src/kinesis/helpers.rs | 6 +----- src/kinesis/models.rs | 2 -- src/kinesis/tests.rs | 21 +-------------------- src/main.rs | 16 +--------------- 7 files changed, 7 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index 2128d3a..d704fdd 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Arch Linux. --to-datetime End datetime position to tail up to. ISO 8601 format --max-messages Maximum number of messages to retrieve --timeout Exit if no messages received after seconds - --max-attempts Maximum number of aws sdk retries. Increase if you are seeing throttling errors [default: 3] + --max-attempts Maximum number of aws sdk retries. Increase if you are seeing throttling errors [default: 10] --no-color Disable color output --print-delimiter Print a delimiter between each payload --print-key Print the partition key @@ -52,7 +52,6 @@ Arch Linux. --progress Print progress status --shard-id Shard ID to tail from. Repeat option for each shard ID to filter on -o, --output-file Output file to write to - -c, --concurrent Concurrent number of shards to tail -v, --verbose Display additional information --base64 Base64 encode payloads (eg. for binary data) --utf8 Forces UTF-8 printable payloads diff --git a/src/cli_helpers.rs b/src/cli_helpers.rs index 89ffa6c..4fe6834 100644 --- a/src/cli_helpers.rs +++ b/src/cli_helpers.rs @@ -5,8 +5,6 @@ use chrono::{DateTime, Utc}; use clap::Parser; use log::info; -pub const SEMAPHORE_DEFAULT_SIZE: usize = 50; - #[derive(Debug, Parser)] #[command( version = "{#RELEASE_VERSION} - Grum Ltd\nReport bugs to https://github.com/grumlimited/kinesis-tailr/issues" @@ -42,7 +40,7 @@ pub struct Opt { /// Maximum number of aws sdk retries. Increase if you are seeing throttling errors. #[structopt(long)] - #[clap(default_value_t = 3)] + #[clap(default_value_t = 10)] pub max_attempts: u32, /// Disable color output @@ -81,10 +79,6 @@ pub struct Opt { #[structopt(long, short)] pub output_file: Option, - /// Concurrent number of shards to tail - #[structopt(short, long)] - pub concurrent: Option, - /// Display additional information #[structopt(short, long)] pub verbose: bool, diff --git a/src/kinesis.rs b/src/kinesis.rs index 7c6e1ae..ad30e62 100644 --- a/src/kinesis.rs +++ b/src/kinesis.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Result; use async_trait::async_trait; use aws_sdk_kinesis::operation::get_records::GetRecordsError; @@ -5,7 +7,6 @@ use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput; use chrono::prelude::*; use chrono::Utc; use log::{debug, warn}; -use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio::time::{sleep, Duration}; @@ -42,14 +43,6 @@ where self.seed_shards(tx_shard_iterator_progress.clone()).await?; while let Some(res) = rx_shard_iterator_progress.recv().await { - let permit = self - .get_config() - .semaphore - .clone() - .acquire_owned() - .await - .unwrap(); - let res_clone = res.clone(); match res.next_shard_iterator { @@ -112,8 +105,6 @@ where rx_shard_iterator_progress.close(); } }; - - drop(permit); } debug!("ShardProcessor {} finished", self.get_config().shard_id); @@ -138,8 +129,6 @@ where &self, tx_shard_iterator_progress: Sender, ) -> Result<()> { - let permit = self.get_config().semaphore.clone().acquire_owned().await?; - debug!("Seeding shard {}", self.get_config().shard_id); match self.get_iterator().await { @@ -160,7 +149,6 @@ where } } - drop(permit); Ok(()) } diff --git a/src/kinesis/helpers.rs b/src/kinesis/helpers.rs index 340893d..080c878 100644 --- a/src/kinesis/helpers.rs +++ b/src/kinesis/helpers.rs @@ -11,7 +11,6 @@ use aws_sdk_kinesis::types::Shard; use chrono::Utc; use log::{debug, info}; use tokio::sync::mpsc::Sender; -use tokio::sync::Semaphore; use tokio::time::sleep; use crate::aws::client::AwsKinesisClient; @@ -33,7 +32,6 @@ pub fn new( shard_id: String, from_datetime: Option>, to_datetime: Option>, - semaphore: Arc, tx_records: Sender>, tx_ticker_updates: Option>, ) -> Box + Send + Sync> { @@ -46,7 +44,6 @@ pub fn new( stream, shard_id: Arc::new(shard_id), to_datetime, - semaphore, tx_records, tx_ticker_updates, }, @@ -58,7 +55,6 @@ pub fn new( stream, shard_id: Arc::new(shard_id), to_datetime, - semaphore, tx_records, tx_ticker_updates, }, @@ -194,5 +190,5 @@ pub fn wait_milliseconds() -> u64 { use rand::prelude::*; let mut rng = thread_rng(); - rng.gen_range(50..=1000) + rng.gen_range(50..=100) } diff --git a/src/kinesis/models.rs b/src/kinesis/models.rs index dac6625..72f2468 100644 --- a/src/kinesis/models.rs +++ b/src/kinesis/models.rs @@ -12,7 +12,6 @@ use std::fmt::Debug; use std::sync::Arc; use thiserror::Error; use tokio::sync::mpsc::Sender; -use tokio::sync::Semaphore; #[derive(Debug, Clone)] pub struct ShardIteratorProgress { @@ -50,7 +49,6 @@ pub struct ShardProcessorConfig { pub stream: String, pub shard_id: Arc, pub to_datetime: Option>, - pub semaphore: Arc, pub tx_records: Sender>, pub tx_ticker_updates: Option>, } diff --git a/src/kinesis/tests.rs b/src/kinesis/tests.rs index a423e69..22f0931 100644 --- a/src/kinesis/tests.rs +++ b/src/kinesis/tests.rs @@ -11,7 +11,7 @@ use aws_sdk_kinesis::types::error::InvalidArgumentException; use aws_sdk_kinesis::types::{Record, Shard}; use chrono::prelude::*; use chrono::Utc; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::mpsc; use crate::aws::stream::StreamClient; use crate::kinesis::helpers; @@ -34,15 +34,12 @@ async fn seed_shards_test() { done: Arc::new(Mutex::new(false)), }; - let semaphore: Arc = Arc::new(Semaphore::new(10)); - let processor = ShardProcessorLatest { client, config: ShardProcessorConfig { stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, - semaphore, tx_records, tx_ticker_updates: Some(tx_ticker_updates), }, @@ -72,15 +69,12 @@ async fn seed_shards_test_timestamp_in_future() { let client = TestTimestampInFutureKinesisClient {}; - let semaphore: Arc = Arc::new(Semaphore::new(10)); - let processor = ShardProcessorAtTimestamp { client, config: ShardProcessorConfig { stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, - semaphore, tx_records, tx_ticker_updates: Some(tx_ticker_updates), }, @@ -102,15 +96,12 @@ async fn produced_record_is_processed() { done: Arc::new(Mutex::new(false)), }; - let semaphore: Arc = Arc::new(Semaphore::new(10)); - let processor = ShardProcessorLatest { client: client.clone(), config: ShardProcessorConfig { stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, - semaphore, tx_records, tx_ticker_updates: Some(tx_ticker_updates), }, @@ -149,8 +140,6 @@ async fn beyond_to_timestamp_is_received() { done: Arc::new(Mutex::new(false)), }; - let semaphore: Arc = Arc::new(Semaphore::new(10)); - let to_datetime = Utc.with_ymd_and_hms(2020, 6, 1, 12, 0, 0).unwrap(); let processor = ShardProcessorLatest { client, @@ -158,7 +147,6 @@ async fn beyond_to_timestamp_is_received() { stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: Some(to_datetime), - semaphore, tx_records, tx_ticker_updates: Some(tx_ticker_updates), }, @@ -190,8 +178,6 @@ async fn has_records_beyond_end_ts_when_has_end_ts() { done: Arc::new(Mutex::new(false)), }; - let semaphore: Arc = Arc::new(Semaphore::new(10)); - let to_datetime = Utc.with_ymd_and_hms(2020, 6, 1, 12, 0, 0).unwrap(); let processor = ShardProcessorLatest { client, @@ -199,7 +185,6 @@ async fn has_records_beyond_end_ts_when_has_end_ts() { stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: Some(to_datetime), - semaphore, tx_records, tx_ticker_updates: Some(tx_ticker_updates), }, @@ -251,15 +236,12 @@ async fn has_records_beyond_end_ts_when_no_end_ts() { done: Arc::new(Mutex::new(false)), }; - let semaphore: Arc = Arc::new(Semaphore::new(10)); - let processor = ShardProcessorLatest { client, config: ShardProcessorConfig { stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, - semaphore, tx_records, tx_ticker_updates: Some(tx_ticker_updates), }, @@ -303,7 +285,6 @@ async fn handle_iterator_refresh_ok() { stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, - semaphore: Arc::new(Semaphore::new(10)), tx_records: mpsc::channel::>(10).0, tx_ticker_updates: Some(mpsc::channel::(10).0), }, diff --git a/src/main.rs b/src/main.rs index 6f0b98f..aef7786 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,8 @@ #![allow(clippy::result_large_err)] -use std::sync::Arc; - use anyhow::Result; use clap::Parser; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::mpsc; use tokio::task::JoinSet; use kinesis::helpers::get_shards; @@ -115,8 +113,6 @@ async fn main() -> Result<()> { }; let shard_processors = { - let semaphore = semaphore(shard_count, opt.concurrent); - selected_shards .iter() .map(|shard_id| { @@ -126,7 +122,6 @@ async fn main() -> Result<()> { shard_id.clone(), from_datetime, to_datetime, - semaphore.clone(), tx_records.clone(), tx_ticker_updates.clone(), ); @@ -150,12 +145,3 @@ async fn main() -> Result<()> { Ok(()) } - -fn semaphore(shard_count: usize, concurrent: Option) -> Arc { - let concurrent = match concurrent { - Some(concurrent) => concurrent, - None => std::cmp::min(shard_count, SEMAPHORE_DEFAULT_SIZE), - }; - - Arc::new(Semaphore::new(concurrent)) -}