Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize #1

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "tcp-channel"
version = "0.3.2"
version = "0.4.0"
authors = ["4lDO2 <4lDO2@protonmail.com>"]
edition = "2018"
edition = "2021"
description = "SPSC channels, working on everything that implements Read and Write"
readme = "README.md"
license = "MIT"
Expand All @@ -12,11 +12,11 @@ documentation = "https://docs.rs/tcp-channel"
categories = ["network-programming"]

[dependencies]
bincode = "1.1.2"
byteorder = "1.3.1"
serde = "1.0.89"
quick-error = "1.2.2"
bincode = "^1.3.3"
byteorder = "^1.4.3"
serde = "^1.0.183"
quick-error = "^2.0.1"

[dev-dependencies]
rand = "0.6.5"
serde_derive = "1.0.89"
rand = { version = "0.8.5", features = ["small_rng"] }
serde_derive = "1.0.183"
5 changes: 4 additions & 1 deletion src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::mpsc::{Sender as StdSender, Receiver as StdReceiver, SendError as StdSendError, RecvError as StdRecvError};
use std::sync::mpsc::{
Receiver as StdReceiver, RecvError as StdRecvError, SendError as StdSendError,
Sender as StdSender,
};

pub trait ChannelSend<T> {
type Error;
Expand Down
21 changes: 0 additions & 21 deletions src/endian.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ extern crate quick_error;
extern crate serde;

mod channel;
mod endian;
mod error;
mod receiver;
mod sender;

pub use channel::{ChannelRecv, ChannelSend};
pub use endian::{Endian, BigEndian, LittleEndian, NativeEndian};
pub use error::{RecvError, SendError};
pub use receiver::{Receiver, ReceiverBuilder, DEFAULT_MAX_SIZE};
pub use sender::{Sender, SenderBuilder};
84 changes: 43 additions & 41 deletions src/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use std::io::{BufReader, Read};
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
use std::marker::PhantomData;
use std::net::{TcpListener, TcpStream, ToSocketAddrs};

use bincode::Config;
use byteorder::ReadBytesExt;
use byteorder::{LittleEndian, ReadBytesExt};
use serde::de::DeserializeOwned;

use crate::{ChannelRecv, Endian, BigEndian, RecvError};
use crate::{ChannelRecv, RecvError};

pub const DEFAULT_MAX_SIZE: usize = 64 * 0x100_000;

/// The receiving side of a channel.
pub struct Receiver<T: DeserializeOwned, E: Endian, R: Read = BufReader<TcpStream>> {
pub struct Receiver<T: DeserializeOwned, R: Read = BufReader<TcpStream>> {
reader: R,
config: Config,
max_size: usize,
_marker: PhantomData<(T, E)>,
_marker: PhantomData<T>,

// This buffer is used for storing the currently read bytes in case the stream is nonblocking.
// Otherwise, bincode would deserialize only the currently read bytes.
Expand All @@ -28,47 +26,42 @@ pub struct Receiver<T: DeserializeOwned, E: Endian, R: Read = BufReader<TcpStrea
/// A more convenient way of initializing receivers.
pub struct ReceiverBuilder;

pub struct TypedReceiverBuilder<T, R, E> {
_marker: PhantomData<(T, R, E)>,
max_size: usize,
}
impl ReceiverBuilder {
/// Begin building a new, buffered channel.
pub fn new() -> TypedReceiverBuilder<(), BufReader<TcpStream>, BigEndian> {
pub fn build() -> TypedReceiverBuilder<(), BufReader<TcpStream>> {
Self::buffered()
}
/// Begin building a new, buffered channel.
pub fn buffered() -> TypedReceiverBuilder<(), BufReader<TcpStream>, BigEndian> {
pub fn buffered() -> TypedReceiverBuilder<(), BufReader<TcpStream>> {
TypedReceiverBuilder {
_marker: PhantomData,
max_size: DEFAULT_MAX_SIZE,
}
}
/// Begin building a new, non-buffered channel.
pub fn realtime() -> TypedReceiverBuilder<(), TcpStream, BigEndian> {
pub fn realtime() -> TypedReceiverBuilder<(), TcpStream> {
TypedReceiverBuilder {
_marker: PhantomData,
max_size: DEFAULT_MAX_SIZE,
}
}
}
impl<T, R, E> TypedReceiverBuilder<T, R, E> {

pub struct TypedReceiverBuilder<T, R> {
_marker: PhantomData<(T, R)>,
max_size: usize,
}

impl<T, R> TypedReceiverBuilder<T, R> {
/// Specify the type to send.
pub fn with_type<U: DeserializeOwned>(self) -> TypedReceiverBuilder<U, R, E> {
pub fn with_type<U: DeserializeOwned>(self) -> TypedReceiverBuilder<U, R> {
TypedReceiverBuilder {
_marker: PhantomData,
max_size: self.max_size,
}
}
/// Specify the underlying reader type.
pub fn with_reader<S: Read>(self) -> TypedReceiverBuilder<T, S, E> {
TypedReceiverBuilder {
_marker: PhantomData,
max_size: self.max_size,
}
}
/// Specify the endianness.
pub fn with_endianness<F: Endian>(self) -> TypedReceiverBuilder<T, R, F> {
pub fn with_reader<S: Read>(self) -> TypedReceiverBuilder<T, S> {
TypedReceiverBuilder {
_marker: PhantomData,
max_size: self.max_size,
Expand All @@ -82,29 +75,32 @@ impl<T, R, E> TypedReceiverBuilder<T, R, E> {
}
}
}
impl<T: DeserializeOwned, R: Read, E: Endian> TypedReceiverBuilder<T, R, E> {

impl<T: DeserializeOwned, R: Read> TypedReceiverBuilder<T, R> {
/// Initialize the receiver with the current variables.
pub fn build(self, reader: R) -> Receiver<T, E, R> {
pub fn build(self, reader: R) -> Receiver<T, R> {
Receiver {
_marker: PhantomData,
reader,
config: E::config(),
max_size: self.max_size,
buffer: Vec::new(),
bytes_read: 0,
bytes_to_read: 0,
}
}
}
impl<T: DeserializeOwned, E: Endian> TypedReceiverBuilder<T, BufReader<TcpStream>, E> {

impl<T: DeserializeOwned> TypedReceiverBuilder<T, BufReader<TcpStream>> {
/// Listen for a sender, binding the listener to the specified address.
pub fn listen_once<A: ToSocketAddrs>(self, address: A) -> std::io::Result<Receiver<T, E, BufReader<TcpStream>>> {
pub fn listen_once<A: ToSocketAddrs>(
self,
address: A,
) -> std::io::Result<Receiver<T, BufReader<TcpStream>>> {
let listener = TcpListener::bind(address)?;

let (stream, _) = listener.accept()?;

Ok(Receiver {
config: E::config(),
_marker: PhantomData,
reader: BufReader::new(stream),
max_size: self.max_size,
Expand All @@ -114,15 +110,18 @@ impl<T: DeserializeOwned, E: Endian> TypedReceiverBuilder<T, BufReader<TcpStream
})
}
}
impl<T: DeserializeOwned, E: Endian> TypedReceiverBuilder<T, TcpStream, E> {

impl<T: DeserializeOwned> TypedReceiverBuilder<T, TcpStream> {
/// Listen for a sender, binding the listener to the specified address.
pub fn listen_once<A: ToSocketAddrs>(self, address: A) -> std::io::Result<Receiver<T, E, TcpStream>> {
pub fn listen_once<A: ToSocketAddrs>(
self,
address: A,
) -> std::io::Result<Receiver<T, TcpStream>> {
let listener = TcpListener::bind(address)?;

let (stream, _) = listener.accept()?;

Ok(Receiver {
config: E::config(),
_marker: PhantomData,
reader: stream,
max_size: self.max_size,
Expand All @@ -133,37 +132,40 @@ impl<T: DeserializeOwned, E: Endian> TypedReceiverBuilder<T, TcpStream, E> {
}
}

impl<T: DeserializeOwned, E: Endian, R: Read> ChannelRecv<T> for Receiver<T, E, R> {
impl<T: DeserializeOwned, R: Read> ChannelRecv<T> for Receiver<T, R> {
type Error = RecvError;

fn recv(&mut self) -> Result<T, RecvError> {
if self.bytes_to_read == 0 {
let length = self.reader.read_u64::<E>()? as usize;
let length = self.reader.read_u64::<LittleEndian>()? as usize;
if length > self.max_size {
return Err(RecvError::TooLarge(length))
return Err(RecvError::TooLarge(length));
}

if self.buffer.len() < length {
self.buffer.extend(std::iter::repeat(0).take(length - self.buffer.len()));
self.buffer
.extend(std::iter::repeat(0).take(length - self.buffer.len()));
}

self.bytes_to_read = length;
self.bytes_read = 0;
}

loop {
match self.reader.read(&mut self.buffer[self.bytes_read..self.bytes_to_read]) {
match self
.reader
.read(&mut self.buffer[self.bytes_read..self.bytes_to_read])
{
Ok(size) => {
self.bytes_read += size;
if self.bytes_read >= self.bytes_to_read {
let length = self.bytes_to_read;
self.bytes_to_read = 0;
return Ok(self.config.deserialize(&self.buffer[0..length])?)
return Ok(bincode::deserialize(&self.buffer[0..length])?);
}
},
}
Err(error) => return Err(error.into()),
}
}

}
}
Loading