Skip to content

Commit

Permalink
refactor: fix clippy warnings (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
negezor authored and tyt2y3 committed Apr 24, 2024
1 parent e93100c commit 3525b5f
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion examples/src/bin/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<()> {
.create_producer(output.stream_key()?, Default::default())
.await?;

for batch in 0..std::usize::MAX {
for batch in 0..usize::MAX {
// Take all messages currently buffered in the queue, but do not wait
let mut messages: Vec<SharedMessage> = receiver.drain().collect();
if messages.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/consumer/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Streamers {
let (sender, receiver) = unbounded();
self.max_sid += 1;
let sid = self.max_sid;
if self.streamers.get(&file_id).is_none() {
if !self.streamers.contains_key(&file_id) {
self.streamers.insert(file_id.clone(), Vec::new());
}
let handles = self.streamers.get_mut(&file_id).unwrap();
Expand Down
2 changes: 0 additions & 2 deletions sea-streamer-file/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,6 @@ where

#[cfg(test)]
mod test {
use sea_streamer_types::Buffer;

use super::*;

#[test]
Expand Down
7 changes: 4 additions & 3 deletions sea-streamer-file/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl MessageSource {
#[allow(clippy::never_loop)]
let res = 'outer: loop {
// survey the beacons to narrow down the scope of search
let surveyor = match Surveyor::new(self, |b: &Beacon| {
let surveyor = Surveyor::new(self, |b: &Beacon| {
for item in b.items.iter() {
if (stream_key, shard_id) == (item.header.stream_key(), item.header.shard_id())
{
Expand All @@ -210,8 +210,9 @@ impl MessageSource {
}
SurveyResult::Undecided
})
.await
{
.await;

let surveyor = match surveyor {
Ok(s) => s,
Err(e) => {
break Err(e);
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/producer/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Writers {
options: &FileConnectOptions,
_pro_options: &FileProducerOptions,
) -> Result<FileProducer, FileErr> {
if self.writers.get(&file_id).is_none() {
if !self.writers.contains_key(&file_id) {
self.writers.insert(
file_id.clone(),
Writer::new(file_id.clone(), options).await?,
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Watchers {
/// `Sender` should be unbounded, and never blocks.
fn add(&mut self, file_id: FileId, sender: Sender<FileEvent>) -> Result<Watcher, FileErr> {
assert!(sender.capacity().is_none());
if self.watchers.get(&file_id).is_none() {
if !self.watchers.contains_key(&file_id) {
let watcher = Self::new_watcher(file_id.clone(), self.sender.clone())?;
self.watchers.insert(file_id.clone(), watcher);
}
Expand Down
7 changes: 4 additions & 3 deletions sea-streamer-kafka/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,17 @@ impl KafkaProducer {
) -> KafkaResult<()> {
self.get();
let client = self.inner.take().unwrap();
match spawn_blocking(move || {
let producer = spawn_blocking(move || {
let s = client;
match func(&s) {
Ok(()) => Ok(s),
Err(e) => Err((s, e)),
}
})
.await
.map_err(runtime_error)?
{
.map_err(runtime_error)?;

match producer {
Ok(inner) => {
self.inner = Some(inner);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-redis/src/consumer/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Cluster {
}
StatusMsg::Moved { shard, from, to } => {
log::info!("Shard {shard:?} moving from {from} to {to}");
let conn = if self.nodes.get(&to).is_none() {
let conn = if !self.nodes.contains_key(&to) {
Some(
Connection::create_or_reconnect(
to.clone(),
Expand Down Expand Up @@ -217,7 +217,7 @@ impl Cluster {
}

fn add_node(&mut self, node_id: NodeId, event_sender: Sender<StatusMsg>) -> &Sender<CtrlMsg> {
if self.nodes.get(&node_id).is_none() {
if !self.nodes.contains_key(&node_id) {
let (ctrl_sender, receiver) = bounded(128);
self.nodes.insert(node_id.clone(), ctrl_sender);
let node = Node::add(
Expand Down

0 comments on commit 3525b5f

Please sign in to comment.