From 6ae5d904cc07669e224e16b872d66329dfc8893c Mon Sep 17 00:00:00 2001 From: ysimonson Date: Sun, 13 Aug 2023 10:06:47 -0400 Subject: [PATCH 1/7] fmt --- src/channel.rs | 5 ++++- src/endian.rs | 4 ++-- src/lib.rs | 2 +- src/receiver.rs | 29 +++++++++++++++--------- src/sender.rs | 7 ++++-- tests/blob.rs | 57 ++++++++++++++++++++++++++++++------------------ tests/slow_io.rs | 14 +++++++----- tests/time.rs | 14 +++++++----- 8 files changed, 83 insertions(+), 49 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 7d8fb22..5e8a68b 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,4 +1,7 @@ -use std::sync::mpsc::{Sender as StdSender, Receiver as StdReceiver, SendError as StdSendError, RecvError as StdRecvError}; +use std::sync::mpsc::{ + Receiver as StdReceiver, RecvError as StdRecvError, SendError as StdSendError, + Sender as StdSender, +}; pub trait ChannelSend { type Error; diff --git a/src/endian.rs b/src/endian.rs index ee7e504..6b89230 100644 --- a/src/endian.rs +++ b/src/endian.rs @@ -1,6 +1,6 @@ -pub use byteorder::{BigEndian, LittleEndian, NativeEndian}; -use byteorder::ByteOrder; use bincode::Config; +use byteorder::ByteOrder; +pub use byteorder::{BigEndian, LittleEndian, NativeEndian}; pub trait Endian: ByteOrder { fn config() -> Config; diff --git a/src/lib.rs b/src/lib.rs index 52b5f76..6ad7b05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,7 @@ mod receiver; mod sender; pub use channel::{ChannelRecv, ChannelSend}; -pub use endian::{Endian, BigEndian, LittleEndian, NativeEndian}; +pub use endian::{BigEndian, Endian, LittleEndian, NativeEndian}; pub use error::{RecvError, SendError}; pub use receiver::{Receiver, ReceiverBuilder, DEFAULT_MAX_SIZE}; pub use sender::{Sender, SenderBuilder}; diff --git a/src/receiver.rs b/src/receiver.rs index c7aa2ed..860af65 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,12 +1,12 @@ use std::io::{BufReader, Read}; -use std::net::{TcpListener, TcpStream, ToSocketAddrs}; use std::marker::PhantomData; +use std::net::{TcpListener, TcpStream, ToSocketAddrs}; use bincode::Config; use byteorder::ReadBytesExt; use serde::de::DeserializeOwned; -use crate::{ChannelRecv, Endian, BigEndian, RecvError}; +use crate::{BigEndian, ChannelRecv, Endian, RecvError}; pub const DEFAULT_MAX_SIZE: usize = 64 * 0x100_000; @@ -98,7 +98,10 @@ impl TypedReceiverBuilder { } impl TypedReceiverBuilder, E> { /// Listen for a sender, binding the listener to the specified address. - pub fn listen_once(self, address: A) -> std::io::Result>> { + pub fn listen_once( + self, + address: A, + ) -> std::io::Result>> { let listener = TcpListener::bind(address)?; let (stream, _) = listener.accept()?; @@ -116,7 +119,10 @@ impl TypedReceiverBuilder TypedReceiverBuilder { /// Listen for a sender, binding the listener to the specified address. - pub fn listen_once(self, address: A) -> std::io::Result> { + pub fn listen_once( + self, + address: A, + ) -> std::io::Result> { let listener = TcpListener::bind(address)?; let (stream, _) = listener.accept()?; @@ -140,11 +146,12 @@ impl ChannelRecv for Receiver()? as usize; if length > self.max_size { - return Err(RecvError::TooLarge(length)) + return Err(RecvError::TooLarge(length)); } if self.buffer.len() < length { - self.buffer.extend(std::iter::repeat(0).take(length - self.buffer.len())); + self.buffer + .extend(std::iter::repeat(0).take(length - self.buffer.len())); } self.bytes_to_read = length; @@ -152,18 +159,20 @@ impl ChannelRecv for Receiver { self.bytes_read += size; if self.bytes_read >= self.bytes_to_read { let length = self.bytes_to_read; self.bytes_to_read = 0; - return Ok(self.config.deserialize(&self.buffer[0..length])?) + return Ok(self.config.deserialize(&self.buffer[0..length])?); } - }, + } Err(error) => return Err(error.into()), } } - } } diff --git a/src/sender.rs b/src/sender.rs index 0065e59..2485f78 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -6,7 +6,7 @@ use bincode::Config; use byteorder::WriteBytesExt; use serde::Serialize; -use crate::{ChannelSend, Endian, BigEndian, SendError}; +use crate::{BigEndian, ChannelSend, Endian, SendError}; /// The sending side of a channel. pub struct Sender> { @@ -72,7 +72,10 @@ impl TypedSenderBuilder { } impl TypedSenderBuilder, E> { /// Connect to a listening receiver, at a specified address. - pub fn connect(self, address: A) -> std::io::Result>> { + pub fn connect( + self, + address: A, + ) -> std::io::Result>> { let stream = TcpStream::connect(address)?; Ok(Sender { diff --git a/tests/blob.rs b/tests/blob.rs index 552425c..ca7a398 100644 --- a/tests/blob.rs +++ b/tests/blob.rs @@ -1,17 +1,22 @@ extern crate tcp_channel; -#[macro_use] extern crate quick_error; +#[macro_use] +extern crate quick_error; extern crate serde; -#[macro_use] extern crate serde_derive; +#[macro_use] +extern crate serde_derive; use std::any::Any; -use std::io::{BufReader, BufWriter, ErrorKind as IoErrorKind}; use std::io::prelude::*; +use std::io::{BufReader, BufWriter, ErrorKind as IoErrorKind}; use std::net::{TcpListener, TcpStream}; use std::thread::JoinHandle; -use rand::{FromEntropy, RngCore, rngs::SmallRng}; +use rand::{rngs::SmallRng, FromEntropy, RngCore}; use serde::de::DeserializeOwned; -use tcp_channel::{SenderBuilder, ReceiverBuilder, ChannelSend, ChannelRecv, BigEndian, Receiver as TcpReceiver, RecvError, DEFAULT_MAX_SIZE}; +use tcp_channel::{ + BigEndian, ChannelRecv, ChannelSend, Receiver as TcpReceiver, ReceiverBuilder, RecvError, + SenderBuilder, DEFAULT_MAX_SIZE, +}; // This emulates a real delayed TCP connection. mod slow_io; @@ -52,14 +57,16 @@ quick_error! { } } -fn pretend_blocking_read(receiver: &mut TcpReceiver) -> Result { +fn pretend_blocking_read( + receiver: &mut TcpReceiver, +) -> Result { loop { match receiver.recv() { Ok(value) => return Ok(value), Err(RecvError::IoError(ioerror)) => match ioerror.kind() { IoErrorKind::WouldBlock => continue, _ => return Err(RecvError::IoError(ioerror).into()), - } + }, Err(error) => return Err(error.into()), } } @@ -68,7 +75,7 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { const SIZE: usize = 262_144; // This test generates a random 256KiB BLOB, sends it, and then receives the BLOB, where every byte is // added by 1. - + let (sender, receiver) = std::sync::mpsc::channel(); let thread: JoinHandle> = std::thread::spawn(move || { @@ -76,17 +83,20 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { let listener = loop { match TcpListener::bind(format!("127.0.0.1:{}", port)) { Ok(listener) => break Ok(listener), - Err(ioerror) => if let IoErrorKind::AddrInUse = ioerror.kind() { - port += 1; - if port >= 8000 { - break Err(ioerror) + Err(ioerror) => { + if let IoErrorKind::AddrInUse = ioerror.kind() { + port += 1; + if port >= 8000 { + break Err(ioerror); + } + continue; + } else { + break Err(ioerror); } - continue - } else { - break Err(ioerror) } } - }.unwrap(); + } + .unwrap(); sender.send(port).unwrap(); let (stream, _) = listener.accept().unwrap(); @@ -96,7 +106,11 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { .with_endianness::() .with_reader::>>() .with_max_size(max_size) - .build(BufReader::new(SlowReader::new(stream.try_clone().unwrap(), slow, blocking))); + .build(BufReader::new(SlowReader::new( + stream.try_clone().unwrap(), + slow, + blocking, + ))); let mut sender = SenderBuilder::buffered() .with_type::() @@ -112,8 +126,8 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { } sender.send(&Response::Respond(blob)).unwrap(); sender.flush().unwrap(); - }, - Request::Stop => return Ok(()) + } + Request::Stop => return Ok(()), } } @@ -135,7 +149,7 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { .build(BufReader::new(SlowReader::new(stream, slow, blocking))); let blob = { - let mut blob = vec! [0u8; SIZE]; + let mut blob = vec![0u8; SIZE]; SmallRng::from_entropy().fill_bytes(&mut blob); @@ -148,7 +162,8 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { let new_blob = match pretend_blocking_read(&mut receiver).unwrap() { Response::Respond(blob) => blob, }; - let precalculated_new_blob = blob.into_iter() + let precalculated_new_blob = blob + .into_iter() .map(|byte| byte.wrapping_add(1)) .collect::>(); diff --git a/tests/slow_io.rs b/tests/slow_io.rs index 7c79b0c..b7300eb 100644 --- a/tests/slow_io.rs +++ b/tests/slow_io.rs @@ -1,6 +1,6 @@ -use std::io::{Result, ErrorKind as IoErrorKind}; use std::io::prelude::*; -use std::time::{Instant, Duration}; +use std::io::{ErrorKind as IoErrorKind, Result}; +use std::time::{Duration, Instant}; // In milliseconds. const DELAY: u64 = 200; @@ -24,10 +24,12 @@ impl SlowWriter { fn emulate_nonblocking(last_io: &mut Option) -> Result<()> { match *last_io { - Some(last_io_some) => if last_io_some + Duration::from_millis(DELAY) < Instant::now() { - *last_io = None; - return Err(IoErrorKind::WouldBlock.into()) - }, + Some(last_io_some) => { + if last_io_some + Duration::from_millis(DELAY) < Instant::now() { + *last_io = None; + return Err(IoErrorKind::WouldBlock.into()); + } + } None => *last_io = Some(Instant::now()), } Ok(()) diff --git a/tests/time.rs b/tests/time.rs index 78edc1d..302fb84 100644 --- a/tests/time.rs +++ b/tests/time.rs @@ -1,16 +1,18 @@ extern crate tcp_channel; -#[macro_use] extern crate quick_error; +#[macro_use] +extern crate quick_error; extern crate serde; -#[macro_use] extern crate serde_derive; +#[macro_use] +extern crate serde_derive; use std::any::Any; use std::io::BufReader; use std::net::{TcpListener, TcpStream}; use std::thread::JoinHandle; // Yeah, regular channels are used to tell the client when the server has started! -use std::time::{SystemTime, Duration}; +use std::time::{Duration, SystemTime}; -use tcp_channel::{SenderBuilder, ReceiverBuilder, ChannelSend, ChannelRecv, BigEndian}; +use tcp_channel::{BigEndian, ChannelRecv, ChannelSend, ReceiverBuilder, SenderBuilder}; #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] enum Request { @@ -52,7 +54,7 @@ quick_error! { fn time() -> Result<(), Error> { // This test sets up a simple time server, enum based. let initial_time = SystemTime::now(); - + let (sender, receiver) = std::sync::mpsc::channel(); let thread: JoinHandle> = std::thread::spawn(move || { @@ -75,7 +77,7 @@ fn time() -> Result<(), Error> { match command { Request::RequestTime => sender.send(&Response::Respond(time))?, Request::SetTime(new_time) => time = new_time, - Request::Stop => return Ok(()) + Request::Stop => return Ok(()), } } From 0516666358ba158b8852fdf961cc0fbdb3ad9a6c Mon Sep 17 00:00:00 2001 From: ysimonson Date: Sun, 13 Aug 2023 10:08:55 -0400 Subject: [PATCH 2/7] Update deps --- Cargo.toml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5ca7c63..e97115f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,11 @@ documentation = "https://docs.rs/tcp-channel" categories = ["network-programming"] [dependencies] -bincode = "1.1.2" -byteorder = "1.3.1" -serde = "1.0.89" -quick-error = "1.2.2" +bincode = "^1.3.3" +byteorder = "^1.4.3" +serde = "^1.0.183" +quick-error = "^2.0.1" [dev-dependencies] -rand = "0.6.5" -serde_derive = "1.0.89" +rand = "^0.8.5" +serde_derive = "^1.0.183" From cf4e09eae43d543edefd8bbcad0dde3625d889a8 Mon Sep 17 00:00:00 2001 From: ysimonson Date: Sun, 13 Aug 2023 10:20:31 -0400 Subject: [PATCH 3/7] Remove endianness --- src/endian.rs | 21 ------------------ src/lib.rs | 2 -- src/receiver.rs | 54 ++++++++++++++++++---------------------------- src/sender.rs | 57 ++++++++++++++++++++----------------------------- 4 files changed, 44 insertions(+), 90 deletions(-) delete mode 100644 src/endian.rs diff --git a/src/endian.rs b/src/endian.rs deleted file mode 100644 index 6b89230..0000000 --- a/src/endian.rs +++ /dev/null @@ -1,21 +0,0 @@ -use bincode::Config; -use byteorder::ByteOrder; -pub use byteorder::{BigEndian, LittleEndian, NativeEndian}; - -pub trait Endian: ByteOrder { - fn config() -> Config; -} -impl Endian for BigEndian { - fn config() -> Config { - let mut config = bincode::config(); - config.big_endian(); - config - } -} -impl Endian for LittleEndian { - fn config() -> Config { - let mut config = bincode::config(); - config.little_endian(); - config - } -} diff --git a/src/lib.rs b/src/lib.rs index 6ad7b05..d4330e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,13 +7,11 @@ extern crate quick_error; extern crate serde; mod channel; -mod endian; mod error; mod receiver; mod sender; pub use channel::{ChannelRecv, ChannelSend}; -pub use endian::{BigEndian, Endian, LittleEndian, NativeEndian}; pub use error::{RecvError, SendError}; pub use receiver::{Receiver, ReceiverBuilder, DEFAULT_MAX_SIZE}; pub use sender::{Sender, SenderBuilder}; diff --git a/src/receiver.rs b/src/receiver.rs index 860af65..ab4a69e 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -2,20 +2,18 @@ use std::io::{BufReader, Read}; use std::marker::PhantomData; use std::net::{TcpListener, TcpStream, ToSocketAddrs}; -use bincode::Config; -use byteorder::ReadBytesExt; +use byteorder::{LittleEndian, ReadBytesExt}; use serde::de::DeserializeOwned; -use crate::{BigEndian, ChannelRecv, Endian, RecvError}; +use crate::{ChannelRecv, RecvError}; pub const DEFAULT_MAX_SIZE: usize = 64 * 0x100_000; /// The receiving side of a channel. -pub struct Receiver> { +pub struct Receiver> { reader: R, - config: Config, max_size: usize, - _marker: PhantomData<(T, E)>, + _marker: PhantomData, // This buffer is used for storing the currently read bytes in case the stream is nonblocking. // Otherwise, bincode would deserialize only the currently read bytes. @@ -28,47 +26,40 @@ pub struct Receiver { - _marker: PhantomData<(T, R, E)>, +pub struct TypedReceiverBuilder { + _marker: PhantomData<(T, R)>, max_size: usize, } impl ReceiverBuilder { /// Begin building a new, buffered channel. - pub fn new() -> TypedReceiverBuilder<(), BufReader, BigEndian> { + pub fn new() -> TypedReceiverBuilder<(), BufReader> { Self::buffered() } /// Begin building a new, buffered channel. - pub fn buffered() -> TypedReceiverBuilder<(), BufReader, BigEndian> { + pub fn buffered() -> TypedReceiverBuilder<(), BufReader> { TypedReceiverBuilder { _marker: PhantomData, max_size: DEFAULT_MAX_SIZE, } } /// Begin building a new, non-buffered channel. - pub fn realtime() -> TypedReceiverBuilder<(), TcpStream, BigEndian> { + pub fn realtime() -> TypedReceiverBuilder<(), TcpStream> { TypedReceiverBuilder { _marker: PhantomData, max_size: DEFAULT_MAX_SIZE, } } } -impl TypedReceiverBuilder { +impl TypedReceiverBuilder { /// Specify the type to send. - pub fn with_type(self) -> TypedReceiverBuilder { + pub fn with_type(self) -> TypedReceiverBuilder { TypedReceiverBuilder { _marker: PhantomData, max_size: self.max_size, } } /// Specify the underlying reader type. - pub fn with_reader(self) -> TypedReceiverBuilder { - TypedReceiverBuilder { - _marker: PhantomData, - max_size: self.max_size, - } - } - /// Specify the endianness. - pub fn with_endianness(self) -> TypedReceiverBuilder { + pub fn with_reader(self) -> TypedReceiverBuilder { TypedReceiverBuilder { _marker: PhantomData, max_size: self.max_size, @@ -82,13 +73,12 @@ impl TypedReceiverBuilder { } } } -impl TypedReceiverBuilder { +impl TypedReceiverBuilder { /// Initialize the receiver with the current variables. - pub fn build(self, reader: R) -> Receiver { + pub fn build(self, reader: R) -> Receiver { Receiver { _marker: PhantomData, reader, - config: E::config(), max_size: self.max_size, buffer: Vec::new(), bytes_read: 0, @@ -96,18 +86,17 @@ impl TypedReceiverBuilder { } } } -impl TypedReceiverBuilder, E> { +impl TypedReceiverBuilder> { /// Listen for a sender, binding the listener to the specified address. pub fn listen_once( self, address: A, - ) -> std::io::Result>> { + ) -> std::io::Result>> { let listener = TcpListener::bind(address)?; let (stream, _) = listener.accept()?; Ok(Receiver { - config: E::config(), _marker: PhantomData, reader: BufReader::new(stream), max_size: self.max_size, @@ -117,18 +106,17 @@ impl TypedReceiverBuilder TypedReceiverBuilder { +impl TypedReceiverBuilder { /// Listen for a sender, binding the listener to the specified address. pub fn listen_once( self, address: A, - ) -> std::io::Result> { + ) -> std::io::Result> { let listener = TcpListener::bind(address)?; let (stream, _) = listener.accept()?; Ok(Receiver { - config: E::config(), _marker: PhantomData, reader: stream, max_size: self.max_size, @@ -139,12 +127,12 @@ impl TypedReceiverBuilder { } } -impl ChannelRecv for Receiver { +impl ChannelRecv for Receiver { type Error = RecvError; fn recv(&mut self) -> Result { if self.bytes_to_read == 0 { - let length = self.reader.read_u64::()? as usize; + let length = self.reader.read_u64::()? as usize; if length > self.max_size { return Err(RecvError::TooLarge(length)); } @@ -168,7 +156,7 @@ impl ChannelRecv for Receiver= self.bytes_to_read { let length = self.bytes_to_read; self.bytes_to_read = 0; - return Ok(self.config.deserialize(&self.buffer[0..length])?); + return Ok(bincode::deserialize(&self.buffer[0..length])?); } } Err(error) => return Err(error.into()), diff --git a/src/sender.rs b/src/sender.rs index 2485f78..0954ce4 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -2,113 +2,102 @@ use std::io::{BufWriter, Write}; use std::marker::PhantomData; use std::net::{TcpStream, ToSocketAddrs}; -use bincode::Config; -use byteorder::WriteBytesExt; +use byteorder::{LittleEndian, WriteBytesExt}; use serde::Serialize; -use crate::{BigEndian, ChannelSend, Endian, SendError}; +use crate::{ChannelSend, SendError}; /// The sending side of a channel. -pub struct Sender> { +pub struct Sender> { writer: W, - config: Config, - _marker: PhantomData<(T, E)>, + _marker: PhantomData, } /// A more convenient way of initializing senders. pub struct SenderBuilder; -pub struct TypedSenderBuilder { - _marker: PhantomData<(T, W, E)>, +pub struct TypedSenderBuilder { + _marker: PhantomData<(T, W)>, } impl SenderBuilder { /// Begin building a new, buffered channel. - pub fn new() -> TypedSenderBuilder<(), BufWriter, BigEndian> { + pub fn new() -> TypedSenderBuilder<(), BufWriter> { Self::buffered() } /// Begin building a new, buffered channel. - pub fn buffered() -> TypedSenderBuilder<(), BufWriter, BigEndian> { + pub fn buffered() -> TypedSenderBuilder<(), BufWriter> { TypedSenderBuilder { _marker: PhantomData, } } /// Begin building a new, non-buffered channel. - pub fn realtime() -> TypedSenderBuilder<(), TcpStream, BigEndian> { + pub fn realtime() -> TypedSenderBuilder<(), TcpStream> { TypedSenderBuilder { _marker: PhantomData, } } } -impl TypedSenderBuilder { +impl TypedSenderBuilder { /// Specify the type to send. - pub fn with_type(self) -> TypedSenderBuilder { + pub fn with_type(self) -> TypedSenderBuilder { TypedSenderBuilder { _marker: PhantomData, } } /// Specify the underlying writer type. - pub fn with_writer(self) -> TypedSenderBuilder { - TypedSenderBuilder { - _marker: PhantomData, - } - } - /// Specify the endianness. - pub fn with_endianness(self) -> TypedSenderBuilder { + pub fn with_writer(self) -> TypedSenderBuilder { TypedSenderBuilder { _marker: PhantomData, } } } -impl TypedSenderBuilder { +impl TypedSenderBuilder { /// Initialize the sender with the current variables. - pub fn build(self, writer: W) -> Sender { + pub fn build(self, writer: W) -> Sender { Sender { _marker: PhantomData, writer, - config: E::config(), } } } -impl TypedSenderBuilder, E> { +impl TypedSenderBuilder> { /// Connect to a listening receiver, at a specified address. pub fn connect( self, address: A, - ) -> std::io::Result>> { + ) -> std::io::Result>> { let stream = TcpStream::connect(address)?; Ok(Sender { writer: BufWriter::new(stream), _marker: PhantomData, - config: E::config(), }) } } -impl TypedSenderBuilder { +impl TypedSenderBuilder { /// Connect to a listening receiver, at a specified address. - pub fn connect(self, address: A) -> std::io::Result> { + pub fn connect(self, address: A) -> std::io::Result> { let stream = TcpStream::connect(address)?; stream.set_nodelay(true)?; Ok(Sender { writer: stream, _marker: PhantomData, - config: E::config(), }) } } -impl Sender { +impl Sender { pub fn flush(&mut self) -> std::io::Result<()> { self.writer.flush() } } -impl ChannelSend for Sender { +impl ChannelSend for Sender { type Error = SendError; fn send(&mut self, value: &T) -> Result<(), SendError> { - let buffer = self.config.serialize(value)?; - self.writer.write_u64::(buffer.len() as u64)?; - self.writer.write(&buffer)?; + let buffer = bincode::serialize(value)?; + self.writer.write_u64::(buffer.len() as u64)?; + self.writer.write_all(&buffer)?; Ok(()) } } From 5bbd2454cdd13d062a53410c270be36f36b74dea Mon Sep 17 00:00:00 2001 From: ysimonson Date: Sun, 13 Aug 2023 10:23:03 -0400 Subject: [PATCH 4/7] new() -> build() --- src/receiver.rs | 15 ++++++++++----- src/sender.rs | 16 +++++++++++----- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/receiver.rs b/src/receiver.rs index ab4a69e..af5904c 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -26,13 +26,9 @@ pub struct Receiver> { /// A more convenient way of initializing receivers. pub struct ReceiverBuilder; -pub struct TypedReceiverBuilder { - _marker: PhantomData<(T, R)>, - max_size: usize, -} impl ReceiverBuilder { /// Begin building a new, buffered channel. - pub fn new() -> TypedReceiverBuilder<(), BufReader> { + pub fn build() -> TypedReceiverBuilder<(), BufReader> { Self::buffered() } /// Begin building a new, buffered channel. @@ -50,6 +46,12 @@ impl ReceiverBuilder { } } } + +pub struct TypedReceiverBuilder { + _marker: PhantomData<(T, R)>, + max_size: usize, +} + impl TypedReceiverBuilder { /// Specify the type to send. pub fn with_type(self) -> TypedReceiverBuilder { @@ -73,6 +75,7 @@ impl TypedReceiverBuilder { } } } + impl TypedReceiverBuilder { /// Initialize the receiver with the current variables. pub fn build(self, reader: R) -> Receiver { @@ -86,6 +89,7 @@ impl TypedReceiverBuilder { } } } + impl TypedReceiverBuilder> { /// Listen for a sender, binding the listener to the specified address. pub fn listen_once( @@ -106,6 +110,7 @@ impl TypedReceiverBuilder> { }) } } + impl TypedReceiverBuilder { /// Listen for a sender, binding the listener to the specified address. pub fn listen_once( diff --git a/src/sender.rs b/src/sender.rs index 0954ce4..bc2bf2a 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -16,13 +16,9 @@ pub struct Sender> { /// A more convenient way of initializing senders. pub struct SenderBuilder; -pub struct TypedSenderBuilder { - _marker: PhantomData<(T, W)>, -} - impl SenderBuilder { /// Begin building a new, buffered channel. - pub fn new() -> TypedSenderBuilder<(), BufWriter> { + pub fn build() -> TypedSenderBuilder<(), BufWriter> { Self::buffered() } /// Begin building a new, buffered channel. @@ -38,6 +34,11 @@ impl SenderBuilder { } } } + +pub struct TypedSenderBuilder { + _marker: PhantomData<(T, W)>, +} + impl TypedSenderBuilder { /// Specify the type to send. pub fn with_type(self) -> TypedSenderBuilder { @@ -52,6 +53,7 @@ impl TypedSenderBuilder { } } } + impl TypedSenderBuilder { /// Initialize the sender with the current variables. pub fn build(self, writer: W) -> Sender { @@ -61,6 +63,7 @@ impl TypedSenderBuilder { } } } + impl TypedSenderBuilder> { /// Connect to a listening receiver, at a specified address. pub fn connect( @@ -75,6 +78,7 @@ impl TypedSenderBuilder> { }) } } + impl TypedSenderBuilder { /// Connect to a listening receiver, at a specified address. pub fn connect(self, address: A) -> std::io::Result> { @@ -87,11 +91,13 @@ impl TypedSenderBuilder { }) } } + impl Sender { pub fn flush(&mut self) -> std::io::Result<()> { self.writer.flush() } } + impl ChannelSend for Sender { type Error = SendError; fn send(&mut self, value: &T) -> Result<(), SendError> { From 07cfd81c9f7666971684587fbf2fead64808f886 Mon Sep 17 00:00:00 2001 From: ysimonson Date: Sun, 13 Aug 2023 10:26:57 -0400 Subject: [PATCH 5/7] Fix tests --- Cargo.toml | 4 ++-- tests/blob.rs | 10 +++------- tests/time.rs | 6 +----- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e97115f..93599b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,5 +18,5 @@ serde = "^1.0.183" quick-error = "^2.0.1" [dev-dependencies] -rand = "^0.8.5" -serde_derive = "^1.0.183" +rand = { version = "0.8.5", features = ["small_rng"] } +serde_derive = "1.0.183" diff --git a/tests/blob.rs b/tests/blob.rs index ca7a398..5857607 100644 --- a/tests/blob.rs +++ b/tests/blob.rs @@ -11,10 +11,10 @@ use std::io::{BufReader, BufWriter, ErrorKind as IoErrorKind}; use std::net::{TcpListener, TcpStream}; use std::thread::JoinHandle; -use rand::{rngs::SmallRng, FromEntropy, RngCore}; +use rand::{rngs::SmallRng, SeedableRng, RngCore}; use serde::de::DeserializeOwned; use tcp_channel::{ - BigEndian, ChannelRecv, ChannelSend, Receiver as TcpReceiver, ReceiverBuilder, RecvError, + ChannelRecv, ChannelSend, Receiver as TcpReceiver, ReceiverBuilder, RecvError, SenderBuilder, DEFAULT_MAX_SIZE, }; @@ -58,7 +58,7 @@ quick_error! { } fn pretend_blocking_read( - receiver: &mut TcpReceiver, + receiver: &mut TcpReceiver, ) -> Result { loop { match receiver.recv() { @@ -103,7 +103,6 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { let mut receiver = ReceiverBuilder::buffered() .with_type::() - .with_endianness::() .with_reader::>>() .with_max_size(max_size) .build(BufReader::new(SlowReader::new( @@ -114,7 +113,6 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { let mut sender = SenderBuilder::buffered() .with_type::() - .with_endianness::() .with_writer::>>() .build(BufWriter::new(SlowWriter::new(stream, slow, true))); @@ -138,13 +136,11 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { let mut sender = SenderBuilder::realtime() .with_type::() .with_writer::>() - .with_endianness::() .build(SlowWriter::new(stream.try_clone().unwrap(), slow, true)); let mut receiver = ReceiverBuilder::buffered() .with_type::() .with_reader::>>() - .with_endianness::() .with_max_size(max_size) .build(BufReader::new(SlowReader::new(stream, slow, blocking))); diff --git a/tests/time.rs b/tests/time.rs index 302fb84..cdaf1a8 100644 --- a/tests/time.rs +++ b/tests/time.rs @@ -12,7 +12,7 @@ use std::thread::JoinHandle; // Yeah, regular channels are used to tell the client when the server has started! use std::time::{Duration, SystemTime}; -use tcp_channel::{BigEndian, ChannelRecv, ChannelSend, ReceiverBuilder, SenderBuilder}; +use tcp_channel::{ChannelRecv, ChannelSend, ReceiverBuilder, SenderBuilder}; #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] enum Request { @@ -65,12 +65,10 @@ fn time() -> Result<(), Error> { let mut receiver = ReceiverBuilder::buffered() .with_type::() - .with_endianness::() .build(BufReader::new(stream.try_clone()?)); let mut sender = SenderBuilder::realtime() .with_type::() - .with_endianness::() .build(stream); while let Ok(command) = receiver.recv() { @@ -87,12 +85,10 @@ fn time() -> Result<(), Error> { let stream = TcpStream::connect("127.0.0.1:8888")?; let mut sender = SenderBuilder::realtime() .with_type::() - .with_endianness::() .build(stream.try_clone()?); let mut receiver = ReceiverBuilder::buffered() .with_type::() - .with_endianness::() .build(BufReader::new(stream)); sender.send(&Request::RequestTime)?; From 6f1366a4d15d5f41358d416273f03a3c8c09eac3 Mon Sep 17 00:00:00 2001 From: ysimonson Date: Sun, 13 Aug 2023 10:27:53 -0400 Subject: [PATCH 6/7] Use dyn --- tests/blob.rs | 8 ++++---- tests/time.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/blob.rs b/tests/blob.rs index 5857607..0175cd7 100644 --- a/tests/blob.rs +++ b/tests/blob.rs @@ -11,11 +11,11 @@ use std::io::{BufReader, BufWriter, ErrorKind as IoErrorKind}; use std::net::{TcpListener, TcpStream}; use std::thread::JoinHandle; -use rand::{rngs::SmallRng, SeedableRng, RngCore}; +use rand::{rngs::SmallRng, RngCore, SeedableRng}; use serde::de::DeserializeOwned; use tcp_channel::{ - ChannelRecv, ChannelSend, Receiver as TcpReceiver, ReceiverBuilder, RecvError, - SenderBuilder, DEFAULT_MAX_SIZE, + ChannelRecv, ChannelSend, Receiver as TcpReceiver, ReceiverBuilder, RecvError, SenderBuilder, + DEFAULT_MAX_SIZE, }; // This emulates a real delayed TCP connection. @@ -51,7 +51,7 @@ quick_error! { TcpRecvErr(err: tcp_channel::RecvError) { from() } - JoinErr(err: Box) { + JoinErr(err: Box) { from() } } diff --git a/tests/time.rs b/tests/time.rs index cdaf1a8..894808a 100644 --- a/tests/time.rs +++ b/tests/time.rs @@ -44,7 +44,7 @@ quick_error! { RecvErr2(err: tcp_channel::RecvError) { from() } - JoinErr(err: Box) { + JoinErr(err: Box) { from() } } From 72a4098d5b7d2574433b41aebdd9e66a744f1133 Mon Sep 17 00:00:00 2001 From: ysimonson Date: Sun, 13 Aug 2023 10:29:23 -0400 Subject: [PATCH 7/7] Bump edition/version --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 93599b3..91bf10f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "tcp-channel" -version = "0.3.2" +version = "0.4.0" authors = ["4lDO2 <4lDO2@protonmail.com>"] -edition = "2018" +edition = "2021" description = "SPSC channels, working on everything that implements Read and Write" readme = "README.md" license = "MIT"