Skip to content

Commit

Permalink
Merge pull request #66 from influxdata/cn/testability
Browse files Browse the repository at this point in the history
refactor: Extract pieces to make them more testable
  • Loading branch information
kodiakhq[bot] authored Jan 19, 2022
2 parents d05154b + 553460a commit 4a03dba
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 155 deletions.
30 changes: 27 additions & 3 deletions src/backoff.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use rand::prelude::*;
use std::ops::ControlFlow;
use std::time::Duration;
use tracing::info;

/// Exponential backoff with jitter
///
Expand Down Expand Up @@ -49,9 +51,7 @@ impl Backoff {
pub fn new(config: &BackoffConfig) -> Self {
Self::new_with_rng(config, None)
}
}

impl Backoff {
/// Creates a new `Backoff` with the optional `rng`
///
/// Used [`rand::thread_rng()`] if no rng provided
Expand All @@ -70,7 +70,7 @@ impl Backoff {
}

/// Returns the next backoff duration to wait for
pub fn next(&mut self) -> Duration {
fn next(&mut self) -> Duration {
let range = self.init_backoff..(self.next_backoff_secs * self.base);

let rand_backoff = match self.rng.as_mut() {
Expand All @@ -81,6 +81,30 @@ impl Backoff {
let next_backoff = self.max_backoff_secs.min(rand_backoff);
Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs, next_backoff))
}

/// Perform an async operation that retries with a backoff
pub async fn retry_with_backoff<F, F1, B, C>(&mut self, request_name: &str, do_stuff: F) -> B
where
F: (Fn() -> F1) + Send + Sync,
F1: std::future::Future<Output = ControlFlow<B, C>> + Send,
C: core::fmt::Display + Send,
{
loop {
let e = match do_stuff().await {
ControlFlow::Break(r) => break r,
ControlFlow::Continue(e) => e,
};

let backoff = self.next();
info!(
e=%e,
request_name,
backoff_secs = backoff.as_secs(),
"request encountered non-fatal error - backing off",
);
tokio::time::sleep(backoff).await;
}
}
}

#[cfg(test)]
Expand Down
66 changes: 30 additions & 36 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::ops::ControlFlow;
use std::sync::Arc;

use tokio::sync::Mutex;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -89,42 +89,36 @@ impl ControllerClient {
{
let mut backoff = Backoff::new(&self.backoff_config);

loop {
let error = match f().await {
Ok(v) => return Ok(v),
Err(e) => e,
};

match error {
// broken connection
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
| Error::Connection(_) => self.invalidate_cached_controller_broker().await,

// our broker is actually not the controller
Error::ServerError(ProtocolError::NotController, _) => {
self.invalidate_cached_controller_broker().await;
}

// fatal
_ => {
error!(
e=%error,
request_name,
"request encountered fatal error",
);
return Err(error);
backoff
.retry_with_backoff(request_name, || async {
let error = match f().await {
Ok(v) => return ControlFlow::Break(Ok(v)),
Err(e) => e,
};

match error {
// broken connection
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
| Error::Connection(_) => self.invalidate_cached_controller_broker().await,

// our broker is actually not the controller
Error::ServerError(ProtocolError::NotController, _) => {
self.invalidate_cached_controller_broker().await;
}

// fatal
_ => {
error!(
e=%error,
request_name,
"request encountered fatal error",
);
return ControlFlow::Break(Err(error));
}
}
}

let backoff = backoff.next();
info!(
e=%error,
request_name,
backoff_secs=backoff.as_secs(),
"request encountered non-fatal error - backing off",
);
tokio::time::sleep(backoff).await;
}
ControlFlow::Continue(request_name)
})
.await
}

/// Gets a cached [`BrokerConnection`] to any cluster controller.
Expand Down
59 changes: 27 additions & 32 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
},
record::Record,
};
use std::ops::{Deref, Range};
use std::ops::{ControlFlow, Deref, Range};
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -380,39 +380,34 @@ impl PartitionClient {
{
let mut backoff = Backoff::new(&self.backoff_config);

loop {
let error = match f().await {
Ok(v) => return Ok(v),
Err(e) => e,
};

match error {
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
| Error::Connection(_) => self.invalidate_cached_leader_broker().await,
Error::ServerError(ProtocolError::LeaderNotAvailable, _) => {}
Error::ServerError(ProtocolError::OffsetNotAvailable, _) => {}
Error::ServerError(ProtocolError::NotLeaderOrFollower, _) => {
self.invalidate_cached_leader_broker().await;
}
_ => {
error!(
e=%error,
request_name,
"request encountered fatal error",
);
return Err(error);
backoff
.retry_with_backoff(request_name, || async {
let error = match f().await {
Ok(v) => return ControlFlow::Break(Ok(v)),
Err(e) => e,
};

match error {
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
| Error::Connection(_) => self.invalidate_cached_leader_broker().await,
Error::ServerError(ProtocolError::LeaderNotAvailable, _) => {}
Error::ServerError(ProtocolError::OffsetNotAvailable, _) => {}
Error::ServerError(ProtocolError::NotLeaderOrFollower, _) => {
self.invalidate_cached_leader_broker().await;
}
_ => {
error!(
e=%error,
request_name,
"request encountered fatal error",
);
return ControlFlow::Break(Err(error));
}
}
}

let backoff = backoff.next();
info!(
e=%error,
request_name,
backoff_secs=backoff.as_secs(),
"request encountered non-fatal error - backing off",
);
tokio::time::sleep(backoff).await;
}
ControlFlow::Continue(request_name)
})
.await
}

/// Invalidate the cached broker connection
Expand Down
Loading

0 comments on commit 4a03dba

Please sign in to comment.