Skip to content

Commit

Permalink
Update redis to 0.25 (#21)
Browse files Browse the repository at this point in the history
* chore(deps): bump redis to 0.25

* chore(redis): rename redis async-std tls feature flag

* refactor: use aio::MultiplexedConnection instead of deprecated aio::Connection
  • Loading branch information
negezor authored Apr 24, 2024
1 parent a4eef2a commit b61dedb
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
4 changes: 2 additions & 2 deletions sea-streamer-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ flume = { version = "0.11", default-features = false, features = ["async"] }
lazy_static = { version = "1.4" }
log = { version = "0.4", default-features = false }
mac_address = { version = "1" }
redis = { version = "0.22", default-features = false, features = ["acl", "streams"] }
redis = { version = "0.25", default-features = false, features = ["acl", "streams"] }
sea-streamer-types = { version = "0.3", path = "../sea-streamer-types" }
sea-streamer-runtime = { version = "0.3", path = "../sea-streamer-runtime" }
clap = { version = "4.5", features = ["derive", "env"], optional = true }
Expand All @@ -38,7 +38,7 @@ test = ["anyhow", "async-std?/attributes", "tokio?/full", "env_logger"]
executables = ["anyhow", "env_logger", "clap", "runtime-tokio", "tokio/full"]
runtime-async-std = ["async-std", "redis/async-std-comp", "sea-streamer-runtime/runtime-async-std"]
runtime-tokio = ["tokio", "redis/tokio-comp", "sea-streamer-runtime/runtime-tokio"]
runtime-async-std-native-tls = ["runtime-async-std", "redis/async-std-tls-comp"]
runtime-async-std-native-tls = ["runtime-async-std", "redis/async-std-native-tls-comp"]
runtime-tokio-native-tls = ["runtime-tokio", "redis/tokio-native-tls-comp"]

[[bin]]
Expand Down
11 changes: 7 additions & 4 deletions sea-streamer-redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl RedisCluster {
}

/// Get any available connection to the cluster
pub fn get_any(&mut self) -> RedisResult<(&NodeId, &mut redis::aio::Connection)> {
pub fn get_any(&mut self) -> RedisResult<(&NodeId, &mut redis::aio::MultiplexedConnection)> {
for (node, inner) in self.conn.iter_mut() {
if let Ok(conn) = inner.try_get() {
return Ok((node, conn));
Expand All @@ -93,7 +93,10 @@ impl RedisCluster {

#[inline]
/// Get a connection to the specific node, will wait and retry a few times until dead.
pub async fn get(&mut self, node: &NodeId) -> RedisResult<&mut redis::aio::Connection> {
pub async fn get(
&mut self,
node: &NodeId,
) -> RedisResult<&mut redis::aio::MultiplexedConnection> {
Self::get_connection(&mut self.conn, &self.options, node).await
}

Expand All @@ -103,7 +106,7 @@ impl RedisCluster {
pub async fn get_connection_for(
&mut self,
key: &str,
) -> RedisResult<(&NodeId, &mut redis::aio::Connection)> {
) -> RedisResult<(&NodeId, &mut redis::aio::MultiplexedConnection)> {
let node = Self::get_node_for(&self.keys, &self.cluster, key);
Ok((
node,
Expand All @@ -115,7 +118,7 @@ impl RedisCluster {
conn: &'a mut HashMap<NodeId, Connection>,
options: &Arc<RedisConnectOptions>,
node: &NodeId,
) -> RedisResult<&'a mut redis::aio::Connection> {
) -> RedisResult<&'a mut redis::aio::MultiplexedConnection> {
assert!(!node.scheme().is_empty(), "Must have protocol");
assert!(node.host_str().is_some(), "Must have host");
assert!(node.port().is_some(), "Must have port");
Expand Down
13 changes: 7 additions & 6 deletions sea-streamer-redis/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use sea_streamer_runtime::{sleep, timeout};
use sea_streamer_types::{ConnectOptions, StreamErr};

#[derive(Debug)]
/// A wrapped [`redis::aio::Connection`] that can auto-reconnect.
/// A wrapped [`redis::aio::MultiplexedConnection`] that can auto-reconnect.
pub struct Connection {
node: NodeId,
options: Arc<RedisConnectOptions>,
state: State,
}

enum State {
Alive(redis::aio::Connection),
Alive(redis::aio::MultiplexedConnection),
Reconnecting { delay: u32 },
Dead,
}
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Connection {
}

/// Get a mutable connection, will wait and retry a few times until dead.
pub async fn get(&mut self) -> RedisResult<&mut redis::aio::Connection> {
pub async fn get(&mut self) -> RedisResult<&mut redis::aio::MultiplexedConnection> {
match &mut self.state {
State::Alive(_) | State::Dead => (),
State::Reconnecting { delay } => {
Expand All @@ -93,7 +93,7 @@ impl Connection {
}

/// Get a mutable connection, only if it is alive.
pub fn try_get(&mut self) -> RedisResult<&mut redis::aio::Connection> {
pub fn try_get(&mut self) -> RedisResult<&mut redis::aio::MultiplexedConnection> {
match &mut self.state {
State::Alive(conn) => Ok(conn),
State::Dead => Err(StreamErr::Connect(format!(
Expand All @@ -119,7 +119,7 @@ impl Connection {
async fn create_connection(
url: NodeId,
options: Arc<RedisConnectOptions>,
) -> RedisResult<redis::aio::Connection> {
) -> RedisResult<redis::aio::MultiplexedConnection> {
let host = if let Some(host) = url.host_str() {
host.to_owned()
} else {
Expand All @@ -132,6 +132,7 @@ async fn create_connection(
"rediss" => ConnectionAddr::TcpTls {
host,
port,
tls_params: None,
insecure: options.disable_hostname_verification(),
},
"" => return Err(StreamErr::Connect("protocol not set".to_owned())),
Expand All @@ -147,7 +148,7 @@ async fn create_connection(
// I wish we could do `.await_timeout(d)` some day
match timeout(
options.timeout().unwrap_or(DEFAULT_TIMEOUT),
client.get_async_connection(),
client.get_multiplexed_async_connection(),
)
.await
{
Expand Down
20 changes: 16 additions & 4 deletions sea-streamer-redis/src/consumer/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,10 @@ impl Node {
&& Timestamp::now_utc() - *self.options.auto_commit_interval() > self.group.last_commit
}

async fn commit_ack(&mut self, conn: &mut redis::aio::Connection) -> RedisResult<()> {
async fn commit_ack(
&mut self,
conn: &mut redis::aio::MultiplexedConnection,
) -> RedisResult<()> {
for shard in self.shards.iter_mut() {
if !shard.pending_ack.is_empty() {
match self.options.auto_commit() {
Expand Down Expand Up @@ -442,7 +445,10 @@ impl Node {
}
}

async fn read_next(&mut self, conn: &mut redis::aio::Connection) -> RedisResult<ReadResult> {
async fn read_next(
&mut self,
conn: &mut redis::aio::MultiplexedConnection,
) -> RedisResult<ReadResult> {
let mode = self.running_mode();
if matches!(mode, ConsumerMode::Resumable | ConsumerMode::LoadBalanced)
&& self.group.first_read
Expand Down Expand Up @@ -605,7 +611,10 @@ impl Node {
}
}

async fn auto_claim(&mut self, conn: &mut redis::aio::Connection) -> RedisResult<ReadResult> {
async fn auto_claim(
&mut self,
conn: &mut redis::aio::MultiplexedConnection,
) -> RedisResult<ReadResult> {
self.group.last_check = Timestamp::now_utc();
let change = self.group.claiming.is_none();
if self.group.claiming.is_none() {
Expand Down Expand Up @@ -699,7 +708,10 @@ impl Node {
}
}

async fn move_shards(&mut self, conn: &mut redis::aio::Connection) -> Vec<StatusMsg> {
async fn move_shards(
&mut self,
conn: &mut redis::aio::MultiplexedConnection,
) -> Vec<StatusMsg> {
let mut events = Vec::new();
let shards = std::mem::take(&mut self.shards);
for shard in shards {
Expand Down

0 comments on commit b61dedb

Please sign in to comment.