Skip to content

Commit

Permalink
feat: stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
indietyp committed Sep 17, 2024
1 parent caef170 commit b185637
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 19 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions libs/error-stack/experimental/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@ publish = false
[dependencies]
# Public workspace dependencies
error-stack = { path = "..", public = true }
futures-core = { workspace = true, public = true, optional = true }
pin-project-lite = { workspace = true, optional = true }

[build-dependencies]
rustc_version = "0.4.1"

[lints]
workspace = true

[features]
stream = ["dep:futures-core", "dep:pin-project-lite"]

[dev-dependencies]
futures-util.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }

[package.metadata.docs.rs]
all-features = true
cargo-args = ["-Z", "unstable-options", "-Z", "rustdoc-scrape-examples"]
Expand Down
24 changes: 12 additions & 12 deletions libs/error-stack/experimental/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ where
///
/// This trait provides additional functionality to iterators that yield `Result` items,
/// allowing them to be collected into a container while propagating any errors encountered.
pub trait IteratorExt<C> {
pub trait TryReportIteratorExt<C> {
/// The type of the successful items in the iterator.
type Output;
type Ok;

/// Collects the successful items from the iterator into a container, or returns all errors that
/// occured.
///
/// This method attempts to collect all successful items from the iterator into the specified
/// container type. If an error is encountered during iteration, the method immediately returns
/// that error, discarding any previously collected items.
/// container type. If an error is encountered during iteration, the method will exhaust the
/// iterator and return a `Report` containing all errors encountered.
///
/// # Errors
///
Expand All @@ -106,7 +106,7 @@ pub trait IteratorExt<C> {
/// ```
/// use error_stack::{Result, ResultExt, Report};
/// use std::io;
/// use error_stack_experimental::IteratorExt;
/// use error_stack_experimental::TryReportIteratorExt;
///
/// fn fetch_fail() -> Result<u8, io::Error> {
/// # stringify! {
Expand All @@ -123,7 +123,7 @@ pub trait IteratorExt<C> {
/// ```
fn try_collect_reports<A>(self) -> Result<A, [C]>
where
A: FromIterator<Self::Output>;
A: FromIterator<Self::Ok>;

/// Collects the successful items from the iterator into a container or returns all errors up to
/// the specified bound.
Expand All @@ -142,7 +142,7 @@ pub trait IteratorExt<C> {
/// ```
/// use error_stack::{Result, ResultExt, Report};
/// use std::io;
/// use error_stack_experimental::IteratorExt;
/// use error_stack_experimental::TryReportIteratorExt;
///
/// fn fetch_fail() -> Result<u8, io::Error> {
/// # stringify! {
Expand All @@ -159,27 +159,27 @@ pub trait IteratorExt<C> {
/// ```
fn try_collect_reports_bounded<A>(self, bound: usize) -> Result<A, [C]>
where
A: FromIterator<Self::Output>;
A: FromIterator<Self::Ok>;
}

impl<T, C, R, I> IteratorExt<C> for I
impl<T, C, R, I> TryReportIteratorExt<C> for I
where
I: Iterator<Item = core::result::Result<T, R>>,
R: Into<Report<[C]>>,
C: Context,
{
type Output = T;
type Ok = T;

fn try_collect_reports<A>(self) -> Result<A, [C]>
where
A: FromIterator<Self::Output>,
A: FromIterator<Self::Ok>,
{
try_process_reports(self, None, |shunt| shunt.collect())
}

fn try_collect_reports_bounded<A>(self, bound: usize) -> Result<A, [C]>
where
A: FromIterator<Self::Output>,
A: FromIterator<Self::Ok>,
{
try_process_reports(self, Some(bound), |shunt| shunt.collect())
}
Expand Down
6 changes: 5 additions & 1 deletion libs/error-stack/experimental/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#![doc = include_str!("../README.md")]
#![cfg_attr(all(doc, nightly), feature(doc_auto_cfg))]

pub use self::{iter::IteratorExt, result::ResultMultiExt, tuple::TupleExt};
#[cfg(feature = "stream")]
pub use self::stream::TryReportStreamExt;
pub use self::{iter::TryReportIteratorExt, result::ResultMultiExt, tuple::TryReportTupleExt};

mod iter;
mod result;
#[cfg(feature = "stream")]
mod stream;
mod tuple;
193 changes: 193 additions & 0 deletions libs/error-stack/experimental/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use core::{
future::Future,
mem,
pin::Pin,
task::{ready, Context, Poll},
};

use error_stack::{Report, Result};
use futures_core::{FusedFuture, FusedStream, TryStream};
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`try_collect_reports`](TryReportStreamExt::try_collect_reports)
/// and [`try_collect_reports_bounded`](TryReportStreamExt::try_collect_reports_bounded) methods.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryCollectReports<S, A, C> {
#[pin]
stream: S,
output: Result<A, [C]>,

residual_len: usize,
residual_bound: usize
}
}

impl<S, A, C> TryCollectReports<S, A, C>
where
S: TryStream,
A: Default + Extend<S::Ok>,
{
fn new(stream: S, bound: Option<usize>) -> Self {
Self {
stream,
output: Ok(Default::default()),
residual_len: 0,
residual_bound: bound.unwrap_or(usize::MAX),
}
}
}

impl<S, A, C> FusedFuture for TryCollectReports<S, A, C>
where
S: TryStream<Error: Into<Report<[C]>>> + FusedStream,
A: Default + Extend<S::Ok>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<S, A, C> Future for TryCollectReports<S, A, C>
where
S: TryStream<Error: Into<Report<[C]>>>,
A: Default + Extend<S::Ok>,
{
type Output = Result<A, [C]>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

let value = loop {
if *this.residual_len >= *this.residual_bound {
break mem::replace(this.output, Ok(A::default()));
}

let next = ready!(this.stream.as_mut().try_poll_next(cx));
match (next, &mut *this.output) {
(Some(Ok(value)), Ok(output)) => {
output.extend(core::iter::once(value));
}
(Some(Ok(_)), Err(_)) => {
// we're now just consuming the iterator to return all related errors
// so we can just ignore the output
}
(Some(Err(residual)), output @ Ok(_)) => {
*output = Err(residual.into());
*this.residual_len += 1;
}
(Some(Err(residual)), Err(report)) => {
report.append(residual.into());
*this.residual_len += 1;
}
(None, output) => {
break mem::replace(output, Ok(A::default()));
}
}
};

Poll::Ready(value)
}
}

/// Trait extending `TryStream` with methods for collecting error-stack results in a fail-slow
/// manner.
pub trait TryReportStreamExt<C>: TryStream<Error: Into<Report<[C]>>> {
/// Collects all successful items from the stream into a collection, accumulating all errors.
///
/// This method will continue processing the stream even after encountering errors, collecting
/// all successful items and all errors until the stream is exhausted.
///
/// # Examples
///
/// ```
/// # use error_stack::{Report, Result};
/// # use futures_util::stream;
/// # use error_stack_experimental::TryReportStreamExt;
///
/// #[derive(Debug, Clone, PartialEq, Eq)]
/// pub struct UnknownError;
///
/// impl core::fmt::Display for UnknownError {
/// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
/// f.write_str("UnknownError")
/// }
/// }
///
/// impl core::error::Error for UnknownError {}
///
/// #
/// # async fn example() {
/// let stream = stream::iter([
/// Ok(1),
/// Err(Report::new(UnknownError)),
/// Ok(2),
/// Err(Report::new(UnknownError)),
/// ]);
///
/// let result: Result<Vec<i32>, _> = stream.try_collect_reports().await;
/// let report = result.expect_err("should have failed twice");
///
/// assert_eq!(report.current_contexts().count(), 2);
/// # }
/// #
/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
/// ```
fn try_collect_reports<A>(self) -> TryCollectReports<Self, A, C>
where
A: Default + Extend<Self::Ok>,
Self: Sized,
{
TryCollectReports::new(self, None)
}

/// Collects successful items from the stream into a collection, accumulating errors up to a
/// specified bound.
///
/// This method will continue processing the stream after encountering errors, but will stop
/// once the number of accumulated errors reaches the specified `bound`.
///
/// ```
/// # use error_stack::{Report, Result};
/// # use futures_util::stream;
/// # use error_stack_experimental::TryReportStreamExt;
///
/// #[derive(Debug, Clone, PartialEq, Eq)]
/// pub struct UnknownError;
///
/// impl core::fmt::Display for UnknownError {
/// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
/// f.write_str("UnknownError")
/// }
/// }
///
/// impl core::error::Error for UnknownError {}
///
/// #
/// # async fn example() {
/// let stream = stream::iter([
/// Ok(1),
/// Err(Report::new(UnknownError)),
/// Ok(2),
/// Err(Report::new(UnknownError)),
/// ]);
///
/// let result: Result<Vec<i32>, _> = stream.try_collect_reports_bounded(1).await;
/// let report = result.expect_err("should have failed twice");
///
/// assert_eq!(report.current_contexts().count(), 1);
/// # }
/// #
/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
/// ```
fn try_collect_reports_bounded<A>(self, bound: usize) -> TryCollectReports<Self, A, C>
where
A: Default + Extend<Self::Ok>,
Self: Sized,
{
TryCollectReports::new(self, Some(bound))
}
}

impl<S, C> TryReportStreamExt<C> for S where S: TryStream<Error: Into<Report<[C]>>> {}
12 changes: 6 additions & 6 deletions libs/error-stack/experimental/src/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use error_stack::{Report, Result};
/// containing a tuple of the successful values, or an error if any of the results failed.
///
/// The trait is implemented for tuples of up to 16 elements.
pub trait TupleExt<C> {
pub trait TryReportTupleExt<C> {
/// The type of the successful output, typically a tuple of the inner types of the `Result`s.
type Output;

Expand All @@ -21,7 +21,7 @@ pub trait TupleExt<C> {
///
/// ```
/// use error_stack::{Report, Result};
/// use error_stack_experimental::TupleExt;
/// use error_stack_experimental::TryReportTupleExt;
///
/// #[derive(Debug)]
/// struct CustomError;
Expand Down Expand Up @@ -50,7 +50,7 @@ pub trait TupleExt<C> {
fn try_collect(self) -> Result<Self::Output, [C]>;
}

impl<T, R, C> TupleExt<C> for (core::result::Result<T, R>,)
impl<T, R, C> TryReportTupleExt<C> for (core::result::Result<T, R>,)
where
R: Into<Report<[C]>>,
{
Expand Down Expand Up @@ -91,10 +91,10 @@ macro_rules! all_the_tuples {
macro_rules! impl_ext {
($([$type:ident, $output:ident]),+) => {
#[doc(hidden)]
impl<$($type, $output),*, T, R, Context> TupleExt<Context> for ($($type),*, core::result::Result<T, R>)
impl<$($type, $output),*, T, R, Context> TryReportTupleExt<Context> for ($($type),*, core::result::Result<T, R>)
where
R: Into<Report<[Context]>>,
($($type,)*): TupleExt<Context, Output = ($($output,)*)>,
($($type,)*): TryReportTupleExt<Context, Output = ($($output,)*)>,
{
type Output = ($($output),*, T);

Expand Down Expand Up @@ -126,7 +126,7 @@ mod test {

use error_stack::{Report, Result};

use super::TupleExt;
use super::TryReportTupleExt;

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct TestError(usize);
Expand Down

0 comments on commit b185637

Please sign in to comment.