Skip to content

Commit

Permalink
Add async support with async-std and tokio runtimes
Browse files Browse the repository at this point in the history
  • Loading branch information
ventaquil committed Mar 17, 2024
1 parent 9df4a46 commit 51cf0a5
Show file tree
Hide file tree
Showing 5 changed files with 324 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added async support with async-std and tokio runtimes.

### Fixed

- Added missing method comments to improve documentation clarity.
Expand Down
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,14 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
async-std = { version = "1.12.0", optional = true }
async-trait = { version = "0.1.77", optional = true }
chksum-hash-core = "0.0.0"
futures-lite = { version = "2.2.0", optional = true }
thiserror = "1.0.51"
tokio = { version = "1.36.0", optional = true, features = ["fs", "io-util", "io-std"] }

[features]
default = []
async-runtime-async-std = ["async-std", "async-trait", "futures-lite"]
async-runtime-tokio = ["async-trait", "async-std?/tokio1", "tokio"]
144 changes: 144 additions & 0 deletions src/async_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#[cfg(not(feature = "async-runtime-tokio"))]
use std::path::{Path, PathBuf};

use async_std::fs::{metadata, read_dir, DirEntry, File, ReadDir};
use async_std::io::{BufReader, Stdin};
use async_std::path::{Path as AsyncPath, PathBuf as AsyncPathBuf};
use async_std::stream::StreamExt;
use async_trait::async_trait;
use futures_lite::io::AsyncBufReadExt;

use crate::{AsyncChksumable, Hash, Hashable, Result};

macro_rules! impl_async_chksumable {
($($t:ty),+ => $i:tt) => {
$(
#[async_trait]
impl AsyncChksumable for $t $i
)*
};
}

impl_async_chksumable!(AsyncPath, &AsyncPath, &mut AsyncPath => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
let metadata = metadata(&self).await?;
if metadata.is_dir() {
read_dir(self).await?.chksum_with(hash).await
} else {
// everything treat as a file when it is not a directory
File::open(self).await?.chksum_with(hash).await
}
}
});

impl_async_chksumable!(AsyncPathBuf, &AsyncPathBuf, &mut AsyncPathBuf => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.as_path().chksum_with(hash).await
}
});

#[cfg(not(feature = "async-runtime-tokio"))]
impl_async_chksumable!(Path, &Path, &mut Path => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
let metadata = metadata(&self).await?;
if metadata.is_dir() {
read_dir(self).await?.chksum_with(hash).await
} else {
// everything treat as a file when it is not a directory
File::open(self).await?.chksum_with(hash).await
}
}
});

#[cfg(not(feature = "async-runtime-tokio"))]
impl_async_chksumable!(PathBuf, &PathBuf, &mut PathBuf => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.as_path().chksum_with(hash).await
}
});

impl_async_chksumable!(File, &File, &mut File => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
// TODO: check async-rs/async-std#1073
// if self.is_terminal() {
// return Err(Error::IsTerminal);
// }

let mut reader = BufReader::new(self);
loop {
let buffer = reader.fill_buf().await?;
let length = buffer.len();
if length == 0 {
break;
}
buffer.hash_with(hash);
reader.consume(length);
}
Ok(())
}
});

impl_async_chksumable!(DirEntry, &DirEntry, &mut DirEntry => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.path().chksum_with(hash).await
}
});

impl_async_chksumable!(ReadDir, &mut ReadDir => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
let mut dir_entries = Vec::new();
while let Some(dir_entry) = self.next().await {
dir_entries.push(dir_entry?);
}
dir_entries.sort_by_key(DirEntry::path);
for mut dir_entry in dir_entries {
AsyncChksumable::chksum_with(&mut dir_entry, hash).await?;
}
Ok(())
}
});

impl_async_chksumable!(Stdin, &mut Stdin => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
// TODO: check async-rs/async-std#1073
// if self.is_terminal() {
// return Err(Error::IsTerminal);
// }

let mut reader = BufReader::new(self);
loop {
let buffer = reader.fill_buf().await?;
let length = buffer.len();
if length == 0 {
break;
}
buffer.hash_with(hash);
reader.consume(length);
}
Ok(())
}
});
54 changes: 52 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,19 @@
#![forbid(unsafe_code)]

#[cfg(feature = "async-runtime-async-std")]
mod async_std;
mod error;
#[cfg(feature = "async-runtime-tokio")]
mod tokio;

use std::fmt::{Display, LowerHex, UpperHex};
use std::fs::{read_dir, DirEntry, File, ReadDir};
use std::io::{self, BufRead, BufReader, IsTerminal, Stdin, StdinLock};
use std::path::{Path, PathBuf};

#[cfg(any(feature = "async-runtime-async-std", feature = "async-runtime-tokio"))]
use async_trait::async_trait;
#[doc(no_inline)]
pub use chksum_hash_core as hash;

Expand Down Expand Up @@ -70,6 +76,15 @@ where
data.chksum::<T>()
}

/// Computes the hash of the given input.
#[cfg(any(feature = "async-runtime-async-std", feature = "async-runtime-tokio"))]
pub async fn async_chksum<T>(mut data: impl AsyncChksumable + Send) -> Result<T::Digest>
where
T: Hash + Send,
{
data.chksum::<T>().await
}

/// A trait for hash digests.
pub trait Digest: Display {
/// Returns a byte slice of the digest's contents.
Expand Down Expand Up @@ -235,7 +250,7 @@ impl_chksumable!(PathBuf, &PathBuf, &mut PathBuf => {
where
H: Hash,
{
self.as_path().chksum_with(hash)
Chksumable::chksum_with(&mut self.as_path(), hash)
}
});

Expand Down Expand Up @@ -267,7 +282,7 @@ impl_chksumable!(DirEntry, &DirEntry, &mut DirEntry => {
where
H: Hash,
{
self.path().chksum_with(hash)
Chksumable::chksum_with(&mut self.path(), hash)
}
});

Expand Down Expand Up @@ -316,3 +331,38 @@ impl_chksumable!(StdinLock<'_>, &mut StdinLock<'_> => {
Ok(())
}
});

/// A trait for complex objects which must be processed chunk by chunk.
#[cfg(any(feature = "async-runtime-async-std", feature = "async-runtime-tokio"))]
#[async_trait]
pub trait AsyncChksumable {
/// Calculates the checksum of the object.
async fn chksum<H>(&mut self) -> Result<H::Digest>
where
H: Hash + Send,
{
let mut hash = H::default();
self.chksum_with(&mut hash).await?;
Ok(hash.digest())
}

/// Updates the given hash instance with the data from the object.
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send;
}

#[cfg(any(feature = "async-runtime-async-std", feature = "async-runtime-tokio"))]
#[async_trait]
impl<T> AsyncChksumable for T
where
T: Hashable + Send,
{
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.hash_with(hash);
Ok(())
}
}
115 changes: 115 additions & 0 deletions src/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::path::{Path, PathBuf};

use async_trait::async_trait;
use tokio::fs::{metadata, read_dir, DirEntry, File, ReadDir};
use tokio::io::{AsyncBufReadExt as _, BufReader, Stdin};

use crate::{AsyncChksumable, Hash, Hashable, Result};

macro_rules! impl_async_chksumable {
($($t:ty),+ => $i:tt) => {
$(
#[async_trait]
impl AsyncChksumable for $t $i
)*
};
}

impl_async_chksumable!(Path, &Path, &mut Path => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
let metadata = metadata(&self).await?;
if metadata.is_dir() {
read_dir(self).await?.chksum_with(hash).await
} else {
// everything treat as a file when it is not a directory
File::open(self).await?.chksum_with(hash).await
}
}

});

impl_async_chksumable!(PathBuf, &PathBuf, &mut PathBuf => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.as_path().chksum_with(hash).await
}
});

impl_async_chksumable!(File, &mut File => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
// TODO: check tokio-rs/tokio#6407
// if self.is_terminal() {
// return Err(Error::IsTerminal);
// }

let mut reader = BufReader::new(self);
loop {
let buffer = reader.fill_buf().await?;
let length = buffer.len();
if length == 0 {
break;
}
buffer.hash_with(hash);
reader.consume(length);
}
Ok(())
}
});

impl_async_chksumable!(DirEntry, &DirEntry, &mut DirEntry => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.path().chksum_with(hash).await
}
});

impl_async_chksumable!(ReadDir, &mut ReadDir => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
let mut dir_entries = Vec::new();
while let Some(dir_entry) = self.next_entry().await? {
dir_entries.push(dir_entry);
}
dir_entries.sort_by_key(DirEntry::path);
for mut dir_entry in dir_entries {
dir_entry.chksum_with(hash).await?;
}
Ok(())
}
});

impl_async_chksumable!(Stdin, &mut Stdin => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
// TODO: check tokio-rs/tokio#6407
// if self.is_terminal() {
// return Err(Error::IsTerminal);
// }

let mut reader = BufReader::new(self);
loop {
let buffer = reader.fill_buf().await?;
let length = buffer.len();
if length == 0 {
break;
}
buffer.hash_with(hash);
reader.consume(length);
}
Ok(())
}
});

0 comments on commit 51cf0a5

Please sign in to comment.