diff --git a/Cargo.toml b/Cargo.toml index 5ca7c63..91bf10f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "tcp-channel" -version = "0.3.2" +version = "0.4.0" authors = ["4lDO2 <4lDO2@protonmail.com>"] -edition = "2018" +edition = "2021" description = "SPSC channels, working on everything that implements Read and Write" readme = "README.md" license = "MIT" @@ -12,11 +12,11 @@ documentation = "https://docs.rs/tcp-channel" categories = ["network-programming"] [dependencies] -bincode = "1.1.2" -byteorder = "1.3.1" -serde = "1.0.89" -quick-error = "1.2.2" +bincode = "^1.3.3" +byteorder = "^1.4.3" +serde = "^1.0.183" +quick-error = "^2.0.1" [dev-dependencies] -rand = "0.6.5" -serde_derive = "1.0.89" +rand = { version = "0.8.5", features = ["small_rng"] } +serde_derive = "1.0.183" diff --git a/src/channel.rs b/src/channel.rs index 7d8fb22..5e8a68b 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,4 +1,7 @@ -use std::sync::mpsc::{Sender as StdSender, Receiver as StdReceiver, SendError as StdSendError, RecvError as StdRecvError}; +use std::sync::mpsc::{ + Receiver as StdReceiver, RecvError as StdRecvError, SendError as StdSendError, + Sender as StdSender, +}; pub trait ChannelSend { type Error; diff --git a/src/endian.rs b/src/endian.rs deleted file mode 100644 index ee7e504..0000000 --- a/src/endian.rs +++ /dev/null @@ -1,21 +0,0 @@ -pub use byteorder::{BigEndian, LittleEndian, NativeEndian}; -use byteorder::ByteOrder; -use bincode::Config; - -pub trait Endian: ByteOrder { - fn config() -> Config; -} -impl Endian for BigEndian { - fn config() -> Config { - let mut config = bincode::config(); - config.big_endian(); - config - } -} -impl Endian for LittleEndian { - fn config() -> Config { - let mut config = bincode::config(); - config.little_endian(); - config - } -} diff --git a/src/lib.rs b/src/lib.rs index 52b5f76..d4330e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,13 +7,11 @@ extern crate quick_error; extern crate serde; mod channel; -mod endian; mod error; mod receiver; mod sender; pub use channel::{ChannelRecv, ChannelSend}; -pub use endian::{Endian, BigEndian, LittleEndian, NativeEndian}; pub use error::{RecvError, SendError}; pub use receiver::{Receiver, ReceiverBuilder, DEFAULT_MAX_SIZE}; pub use sender::{Sender, SenderBuilder}; diff --git a/src/receiver.rs b/src/receiver.rs index c7aa2ed..af5904c 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,21 +1,19 @@ use std::io::{BufReader, Read}; -use std::net::{TcpListener, TcpStream, ToSocketAddrs}; use std::marker::PhantomData; +use std::net::{TcpListener, TcpStream, ToSocketAddrs}; -use bincode::Config; -use byteorder::ReadBytesExt; +use byteorder::{LittleEndian, ReadBytesExt}; use serde::de::DeserializeOwned; -use crate::{ChannelRecv, Endian, BigEndian, RecvError}; +use crate::{ChannelRecv, RecvError}; pub const DEFAULT_MAX_SIZE: usize = 64 * 0x100_000; /// The receiving side of a channel. -pub struct Receiver> { +pub struct Receiver> { reader: R, - config: Config, max_size: usize, - _marker: PhantomData<(T, E)>, + _marker: PhantomData, // This buffer is used for storing the currently read bytes in case the stream is nonblocking. // Otherwise, bincode would deserialize only the currently read bytes. @@ -28,47 +26,42 @@ pub struct Receiver { - _marker: PhantomData<(T, R, E)>, - max_size: usize, -} impl ReceiverBuilder { /// Begin building a new, buffered channel. - pub fn new() -> TypedReceiverBuilder<(), BufReader, BigEndian> { + pub fn build() -> TypedReceiverBuilder<(), BufReader> { Self::buffered() } /// Begin building a new, buffered channel. - pub fn buffered() -> TypedReceiverBuilder<(), BufReader, BigEndian> { + pub fn buffered() -> TypedReceiverBuilder<(), BufReader> { TypedReceiverBuilder { _marker: PhantomData, max_size: DEFAULT_MAX_SIZE, } } /// Begin building a new, non-buffered channel. - pub fn realtime() -> TypedReceiverBuilder<(), TcpStream, BigEndian> { + pub fn realtime() -> TypedReceiverBuilder<(), TcpStream> { TypedReceiverBuilder { _marker: PhantomData, max_size: DEFAULT_MAX_SIZE, } } } -impl TypedReceiverBuilder { + +pub struct TypedReceiverBuilder { + _marker: PhantomData<(T, R)>, + max_size: usize, +} + +impl TypedReceiverBuilder { /// Specify the type to send. - pub fn with_type(self) -> TypedReceiverBuilder { + pub fn with_type(self) -> TypedReceiverBuilder { TypedReceiverBuilder { _marker: PhantomData, max_size: self.max_size, } } /// Specify the underlying reader type. - pub fn with_reader(self) -> TypedReceiverBuilder { - TypedReceiverBuilder { - _marker: PhantomData, - max_size: self.max_size, - } - } - /// Specify the endianness. - pub fn with_endianness(self) -> TypedReceiverBuilder { + pub fn with_reader(self) -> TypedReceiverBuilder { TypedReceiverBuilder { _marker: PhantomData, max_size: self.max_size, @@ -82,13 +75,13 @@ impl TypedReceiverBuilder { } } } -impl TypedReceiverBuilder { + +impl TypedReceiverBuilder { /// Initialize the receiver with the current variables. - pub fn build(self, reader: R) -> Receiver { + pub fn build(self, reader: R) -> Receiver { Receiver { _marker: PhantomData, reader, - config: E::config(), max_size: self.max_size, buffer: Vec::new(), bytes_read: 0, @@ -96,15 +89,18 @@ impl TypedReceiverBuilder { } } } -impl TypedReceiverBuilder, E> { + +impl TypedReceiverBuilder> { /// Listen for a sender, binding the listener to the specified address. - pub fn listen_once(self, address: A) -> std::io::Result>> { + pub fn listen_once( + self, + address: A, + ) -> std::io::Result>> { let listener = TcpListener::bind(address)?; let (stream, _) = listener.accept()?; Ok(Receiver { - config: E::config(), _marker: PhantomData, reader: BufReader::new(stream), max_size: self.max_size, @@ -114,15 +110,18 @@ impl TypedReceiverBuilder TypedReceiverBuilder { + +impl TypedReceiverBuilder { /// Listen for a sender, binding the listener to the specified address. - pub fn listen_once(self, address: A) -> std::io::Result> { + pub fn listen_once( + self, + address: A, + ) -> std::io::Result> { let listener = TcpListener::bind(address)?; let (stream, _) = listener.accept()?; Ok(Receiver { - config: E::config(), _marker: PhantomData, reader: stream, max_size: self.max_size, @@ -133,18 +132,19 @@ impl TypedReceiverBuilder { } } -impl ChannelRecv for Receiver { +impl ChannelRecv for Receiver { type Error = RecvError; fn recv(&mut self) -> Result { if self.bytes_to_read == 0 { - let length = self.reader.read_u64::()? as usize; + let length = self.reader.read_u64::()? as usize; if length > self.max_size { - return Err(RecvError::TooLarge(length)) + return Err(RecvError::TooLarge(length)); } if self.buffer.len() < length { - self.buffer.extend(std::iter::repeat(0).take(length - self.buffer.len())); + self.buffer + .extend(std::iter::repeat(0).take(length - self.buffer.len())); } self.bytes_to_read = length; @@ -152,18 +152,20 @@ impl ChannelRecv for Receiver { self.bytes_read += size; if self.bytes_read >= self.bytes_to_read { let length = self.bytes_to_read; self.bytes_to_read = 0; - return Ok(self.config.deserialize(&self.buffer[0..length])?) + return Ok(bincode::deserialize(&self.buffer[0..length])?); } - }, + } Err(error) => return Err(error.into()), } } - } } diff --git a/src/sender.rs b/src/sender.rs index 0065e59..bc2bf2a 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -2,110 +2,108 @@ use std::io::{BufWriter, Write}; use std::marker::PhantomData; use std::net::{TcpStream, ToSocketAddrs}; -use bincode::Config; -use byteorder::WriteBytesExt; +use byteorder::{LittleEndian, WriteBytesExt}; use serde::Serialize; -use crate::{ChannelSend, Endian, BigEndian, SendError}; +use crate::{ChannelSend, SendError}; /// The sending side of a channel. -pub struct Sender> { +pub struct Sender> { writer: W, - config: Config, - _marker: PhantomData<(T, E)>, + _marker: PhantomData, } /// A more convenient way of initializing senders. pub struct SenderBuilder; -pub struct TypedSenderBuilder { - _marker: PhantomData<(T, W, E)>, -} - impl SenderBuilder { /// Begin building a new, buffered channel. - pub fn new() -> TypedSenderBuilder<(), BufWriter, BigEndian> { + pub fn build() -> TypedSenderBuilder<(), BufWriter> { Self::buffered() } /// Begin building a new, buffered channel. - pub fn buffered() -> TypedSenderBuilder<(), BufWriter, BigEndian> { + pub fn buffered() -> TypedSenderBuilder<(), BufWriter> { TypedSenderBuilder { _marker: PhantomData, } } /// Begin building a new, non-buffered channel. - pub fn realtime() -> TypedSenderBuilder<(), TcpStream, BigEndian> { + pub fn realtime() -> TypedSenderBuilder<(), TcpStream> { TypedSenderBuilder { _marker: PhantomData, } } } -impl TypedSenderBuilder { + +pub struct TypedSenderBuilder { + _marker: PhantomData<(T, W)>, +} + +impl TypedSenderBuilder { /// Specify the type to send. - pub fn with_type(self) -> TypedSenderBuilder { + pub fn with_type(self) -> TypedSenderBuilder { TypedSenderBuilder { _marker: PhantomData, } } /// Specify the underlying writer type. - pub fn with_writer(self) -> TypedSenderBuilder { - TypedSenderBuilder { - _marker: PhantomData, - } - } - /// Specify the endianness. - pub fn with_endianness(self) -> TypedSenderBuilder { + pub fn with_writer(self) -> TypedSenderBuilder { TypedSenderBuilder { _marker: PhantomData, } } } -impl TypedSenderBuilder { + +impl TypedSenderBuilder { /// Initialize the sender with the current variables. - pub fn build(self, writer: W) -> Sender { + pub fn build(self, writer: W) -> Sender { Sender { _marker: PhantomData, writer, - config: E::config(), } } } -impl TypedSenderBuilder, E> { + +impl TypedSenderBuilder> { /// Connect to a listening receiver, at a specified address. - pub fn connect(self, address: A) -> std::io::Result>> { + pub fn connect( + self, + address: A, + ) -> std::io::Result>> { let stream = TcpStream::connect(address)?; Ok(Sender { writer: BufWriter::new(stream), _marker: PhantomData, - config: E::config(), }) } } -impl TypedSenderBuilder { + +impl TypedSenderBuilder { /// Connect to a listening receiver, at a specified address. - pub fn connect(self, address: A) -> std::io::Result> { + pub fn connect(self, address: A) -> std::io::Result> { let stream = TcpStream::connect(address)?; stream.set_nodelay(true)?; Ok(Sender { writer: stream, _marker: PhantomData, - config: E::config(), }) } } -impl Sender { + +impl Sender { pub fn flush(&mut self) -> std::io::Result<()> { self.writer.flush() } } -impl ChannelSend for Sender { + +impl ChannelSend for Sender { type Error = SendError; fn send(&mut self, value: &T) -> Result<(), SendError> { - let buffer = self.config.serialize(value)?; - self.writer.write_u64::(buffer.len() as u64)?; - self.writer.write(&buffer)?; + let buffer = bincode::serialize(value)?; + self.writer.write_u64::(buffer.len() as u64)?; + self.writer.write_all(&buffer)?; Ok(()) } } diff --git a/tests/blob.rs b/tests/blob.rs index 552425c..0175cd7 100644 --- a/tests/blob.rs +++ b/tests/blob.rs @@ -1,17 +1,22 @@ extern crate tcp_channel; -#[macro_use] extern crate quick_error; +#[macro_use] +extern crate quick_error; extern crate serde; -#[macro_use] extern crate serde_derive; +#[macro_use] +extern crate serde_derive; use std::any::Any; -use std::io::{BufReader, BufWriter, ErrorKind as IoErrorKind}; use std::io::prelude::*; +use std::io::{BufReader, BufWriter, ErrorKind as IoErrorKind}; use std::net::{TcpListener, TcpStream}; use std::thread::JoinHandle; -use rand::{FromEntropy, RngCore, rngs::SmallRng}; +use rand::{rngs::SmallRng, RngCore, SeedableRng}; use serde::de::DeserializeOwned; -use tcp_channel::{SenderBuilder, ReceiverBuilder, ChannelSend, ChannelRecv, BigEndian, Receiver as TcpReceiver, RecvError, DEFAULT_MAX_SIZE}; +use tcp_channel::{ + ChannelRecv, ChannelSend, Receiver as TcpReceiver, ReceiverBuilder, RecvError, SenderBuilder, + DEFAULT_MAX_SIZE, +}; // This emulates a real delayed TCP connection. mod slow_io; @@ -46,20 +51,22 @@ quick_error! { TcpRecvErr(err: tcp_channel::RecvError) { from() } - JoinErr(err: Box) { + JoinErr(err: Box) { from() } } } -fn pretend_blocking_read(receiver: &mut TcpReceiver) -> Result { +fn pretend_blocking_read( + receiver: &mut TcpReceiver, +) -> Result { loop { match receiver.recv() { Ok(value) => return Ok(value), Err(RecvError::IoError(ioerror)) => match ioerror.kind() { IoErrorKind::WouldBlock => continue, _ => return Err(RecvError::IoError(ioerror).into()), - } + }, Err(error) => return Err(error.into()), } } @@ -68,7 +75,7 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { const SIZE: usize = 262_144; // This test generates a random 256KiB BLOB, sends it, and then receives the BLOB, where every byte is // added by 1. - + let (sender, receiver) = std::sync::mpsc::channel(); let thread: JoinHandle> = std::thread::spawn(move || { @@ -76,31 +83,36 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { let listener = loop { match TcpListener::bind(format!("127.0.0.1:{}", port)) { Ok(listener) => break Ok(listener), - Err(ioerror) => if let IoErrorKind::AddrInUse = ioerror.kind() { - port += 1; - if port >= 8000 { - break Err(ioerror) + Err(ioerror) => { + if let IoErrorKind::AddrInUse = ioerror.kind() { + port += 1; + if port >= 8000 { + break Err(ioerror); + } + continue; + } else { + break Err(ioerror); } - continue - } else { - break Err(ioerror) } } - }.unwrap(); + } + .unwrap(); sender.send(port).unwrap(); let (stream, _) = listener.accept().unwrap(); let mut receiver = ReceiverBuilder::buffered() .with_type::() - .with_endianness::() .with_reader::>>() .with_max_size(max_size) - .build(BufReader::new(SlowReader::new(stream.try_clone().unwrap(), slow, blocking))); + .build(BufReader::new(SlowReader::new( + stream.try_clone().unwrap(), + slow, + blocking, + ))); let mut sender = SenderBuilder::buffered() .with_type::() - .with_endianness::() .with_writer::>>() .build(BufWriter::new(SlowWriter::new(stream, slow, true))); @@ -112,8 +124,8 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { } sender.send(&Response::Respond(blob)).unwrap(); sender.flush().unwrap(); - }, - Request::Stop => return Ok(()) + } + Request::Stop => return Ok(()), } } @@ -124,18 +136,16 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { let mut sender = SenderBuilder::realtime() .with_type::() .with_writer::>() - .with_endianness::() .build(SlowWriter::new(stream.try_clone().unwrap(), slow, true)); let mut receiver = ReceiverBuilder::buffered() .with_type::() .with_reader::>>() - .with_endianness::() .with_max_size(max_size) .build(BufReader::new(SlowReader::new(stream, slow, blocking))); let blob = { - let mut blob = vec! [0u8; SIZE]; + let mut blob = vec![0u8; SIZE]; SmallRng::from_entropy().fill_bytes(&mut blob); @@ -148,7 +158,8 @@ fn blob(slow: bool, blocking: bool, max_size: usize) -> Result<(), Error> { let new_blob = match pretend_blocking_read(&mut receiver).unwrap() { Response::Respond(blob) => blob, }; - let precalculated_new_blob = blob.into_iter() + let precalculated_new_blob = blob + .into_iter() .map(|byte| byte.wrapping_add(1)) .collect::>(); diff --git a/tests/slow_io.rs b/tests/slow_io.rs index 7c79b0c..b7300eb 100644 --- a/tests/slow_io.rs +++ b/tests/slow_io.rs @@ -1,6 +1,6 @@ -use std::io::{Result, ErrorKind as IoErrorKind}; use std::io::prelude::*; -use std::time::{Instant, Duration}; +use std::io::{ErrorKind as IoErrorKind, Result}; +use std::time::{Duration, Instant}; // In milliseconds. const DELAY: u64 = 200; @@ -24,10 +24,12 @@ impl SlowWriter { fn emulate_nonblocking(last_io: &mut Option) -> Result<()> { match *last_io { - Some(last_io_some) => if last_io_some + Duration::from_millis(DELAY) < Instant::now() { - *last_io = None; - return Err(IoErrorKind::WouldBlock.into()) - }, + Some(last_io_some) => { + if last_io_some + Duration::from_millis(DELAY) < Instant::now() { + *last_io = None; + return Err(IoErrorKind::WouldBlock.into()); + } + } None => *last_io = Some(Instant::now()), } Ok(()) diff --git a/tests/time.rs b/tests/time.rs index 78edc1d..894808a 100644 --- a/tests/time.rs +++ b/tests/time.rs @@ -1,16 +1,18 @@ extern crate tcp_channel; -#[macro_use] extern crate quick_error; +#[macro_use] +extern crate quick_error; extern crate serde; -#[macro_use] extern crate serde_derive; +#[macro_use] +extern crate serde_derive; use std::any::Any; use std::io::BufReader; use std::net::{TcpListener, TcpStream}; use std::thread::JoinHandle; // Yeah, regular channels are used to tell the client when the server has started! -use std::time::{SystemTime, Duration}; +use std::time::{Duration, SystemTime}; -use tcp_channel::{SenderBuilder, ReceiverBuilder, ChannelSend, ChannelRecv, BigEndian}; +use tcp_channel::{ChannelRecv, ChannelSend, ReceiverBuilder, SenderBuilder}; #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] enum Request { @@ -42,7 +44,7 @@ quick_error! { RecvErr2(err: tcp_channel::RecvError) { from() } - JoinErr(err: Box) { + JoinErr(err: Box) { from() } } @@ -52,7 +54,7 @@ quick_error! { fn time() -> Result<(), Error> { // This test sets up a simple time server, enum based. let initial_time = SystemTime::now(); - + let (sender, receiver) = std::sync::mpsc::channel(); let thread: JoinHandle> = std::thread::spawn(move || { @@ -63,19 +65,17 @@ fn time() -> Result<(), Error> { let mut receiver = ReceiverBuilder::buffered() .with_type::() - .with_endianness::() .build(BufReader::new(stream.try_clone()?)); let mut sender = SenderBuilder::realtime() .with_type::() - .with_endianness::() .build(stream); while let Ok(command) = receiver.recv() { match command { Request::RequestTime => sender.send(&Response::Respond(time))?, Request::SetTime(new_time) => time = new_time, - Request::Stop => return Ok(()) + Request::Stop => return Ok(()), } } @@ -85,12 +85,10 @@ fn time() -> Result<(), Error> { let stream = TcpStream::connect("127.0.0.1:8888")?; let mut sender = SenderBuilder::realtime() .with_type::() - .with_endianness::() .build(stream.try_clone()?); let mut receiver = ReceiverBuilder::buffered() .with_type::() - .with_endianness::() .build(BufReader::new(stream)); sender.send(&Request::RequestTime)?;