diff --git a/Cargo.lock b/Cargo.lock index 97eee69455e..b647e429489 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1979,7 +1979,11 @@ name = "error-stack-experimental" version = "0.0.0-reserved" dependencies = [ "error-stack 0.5.0", + "futures-core", + "futures-util", + "pin-project-lite", "rustc_version", + "tokio", ] [[package]] diff --git a/libs/error-stack/experimental/Cargo.toml b/libs/error-stack/experimental/Cargo.toml index 47d2bf33612..8528cd4d097 100644 --- a/libs/error-stack/experimental/Cargo.toml +++ b/libs/error-stack/experimental/Cargo.toml @@ -16,6 +16,8 @@ publish = false [dependencies] # Public workspace dependencies error-stack = { path = "..", public = true } +futures-core = { workspace = true, public = true, optional = true } +pin-project-lite = { workspace = true, optional = true } [build-dependencies] rustc_version = "0.4.1" @@ -23,6 +25,13 @@ rustc_version = "0.4.1" [lints] workspace = true +[features] +stream = ["dep:futures-core", "dep:pin-project-lite"] + +[dev-dependencies] +futures-util.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread"] } + [package.metadata.docs.rs] all-features = true cargo-args = ["-Z", "unstable-options", "-Z", "rustdoc-scrape-examples"] diff --git a/libs/error-stack/experimental/src/iter.rs b/libs/error-stack/experimental/src/iter.rs index 349642ada6d..65afe1c7a68 100644 --- a/libs/error-stack/experimental/src/iter.rs +++ b/libs/error-stack/experimental/src/iter.rs @@ -85,16 +85,16 @@ where /// /// This trait provides additional functionality to iterators that yield `Result` items, /// allowing them to be collected into a container while propagating any errors encountered. -pub trait IteratorExt { +pub trait TryReportIteratorExt { /// The type of the successful items in the iterator. - type Output; + type Ok; /// Collects the successful items from the iterator into a container, or returns all errors that /// occured. /// /// This method attempts to collect all successful items from the iterator into the specified - /// container type. If an error is encountered during iteration, the method immediately returns - /// that error, discarding any previously collected items. + /// container type. If an error is encountered during iteration, the method will exhaust the + /// iterator and return a `Report` containing all errors encountered. /// /// # Errors /// @@ -106,7 +106,7 @@ pub trait IteratorExt { /// ``` /// use error_stack::{Result, ResultExt, Report}; /// use std::io; - /// use error_stack_experimental::IteratorExt; + /// use error_stack_experimental::TryReportIteratorExt; /// /// fn fetch_fail() -> Result { /// # stringify! { @@ -123,7 +123,7 @@ pub trait IteratorExt { /// ``` fn try_collect_reports(self) -> Result where - A: FromIterator; + A: FromIterator; /// Collects the successful items from the iterator into a container or returns all errors up to /// the specified bound. @@ -142,7 +142,7 @@ pub trait IteratorExt { /// ``` /// use error_stack::{Result, ResultExt, Report}; /// use std::io; - /// use error_stack_experimental::IteratorExt; + /// use error_stack_experimental::TryReportIteratorExt; /// /// fn fetch_fail() -> Result { /// # stringify! { @@ -159,27 +159,27 @@ pub trait IteratorExt { /// ``` fn try_collect_reports_bounded(self, bound: usize) -> Result where - A: FromIterator; + A: FromIterator; } -impl IteratorExt for I +impl TryReportIteratorExt for I where I: Iterator>, R: Into>, C: Context, { - type Output = T; + type Ok = T; fn try_collect_reports(self) -> Result where - A: FromIterator, + A: FromIterator, { try_process_reports(self, None, |shunt| shunt.collect()) } fn try_collect_reports_bounded(self, bound: usize) -> Result where - A: FromIterator, + A: FromIterator, { try_process_reports(self, Some(bound), |shunt| shunt.collect()) } diff --git a/libs/error-stack/experimental/src/lib.rs b/libs/error-stack/experimental/src/lib.rs index bbffd023a42..5c2d965b2ed 100644 --- a/libs/error-stack/experimental/src/lib.rs +++ b/libs/error-stack/experimental/src/lib.rs @@ -1,8 +1,12 @@ #![doc = include_str!("../README.md")] #![cfg_attr(all(doc, nightly), feature(doc_auto_cfg))] -pub use self::{iter::IteratorExt, result::ResultMultiExt, tuple::TupleExt}; +#[cfg(feature = "stream")] +pub use self::stream::TryReportStreamExt; +pub use self::{iter::TryReportIteratorExt, result::ResultMultiExt, tuple::TryReportTupleExt}; mod iter; mod result; +#[cfg(feature = "stream")] +mod stream; mod tuple; diff --git a/libs/error-stack/experimental/src/stream.rs b/libs/error-stack/experimental/src/stream.rs new file mode 100644 index 00000000000..b9d37a291a1 --- /dev/null +++ b/libs/error-stack/experimental/src/stream.rs @@ -0,0 +1,193 @@ +use core::{ + future::Future, + mem, + pin::Pin, + task::{ready, Context, Poll}, +}; + +use error_stack::{Report, Result}; +use futures_core::{FusedFuture, FusedStream, TryStream}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`try_collect_reports`](TryReportStreamExt::try_collect_reports) + /// and [`try_collect_reports_bounded`](TryReportStreamExt::try_collect_reports_bounded) methods. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryCollectReports { + #[pin] + stream: S, + output: Result, + + residual_len: usize, + residual_bound: usize + } +} + +impl TryCollectReports +where + S: TryStream, + A: Default + Extend, +{ + fn new(stream: S, bound: Option) -> Self { + Self { + stream, + output: Ok(Default::default()), + residual_len: 0, + residual_bound: bound.unwrap_or(usize::MAX), + } + } +} + +impl FusedFuture for TryCollectReports +where + S: TryStream>> + FusedStream, + A: Default + Extend, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +impl Future for TryCollectReports +where + S: TryStream>>, + A: Default + Extend, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + let value = loop { + if *this.residual_len >= *this.residual_bound { + break mem::replace(this.output, Ok(A::default())); + } + + let next = ready!(this.stream.as_mut().try_poll_next(cx)); + match (next, &mut *this.output) { + (Some(Ok(value)), Ok(output)) => { + output.extend(core::iter::once(value)); + } + (Some(Ok(_)), Err(_)) => { + // we're now just consuming the iterator to return all related errors + // so we can just ignore the output + } + (Some(Err(residual)), output @ Ok(_)) => { + *output = Err(residual.into()); + *this.residual_len += 1; + } + (Some(Err(residual)), Err(report)) => { + report.append(residual.into()); + *this.residual_len += 1; + } + (None, output) => { + break mem::replace(output, Ok(A::default())); + } + } + }; + + Poll::Ready(value) + } +} + +/// Trait extending `TryStream` with methods for collecting error-stack results in a fail-slow +/// manner. +pub trait TryReportStreamExt: TryStream>> { + /// Collects all successful items from the stream into a collection, accumulating all errors. + /// + /// This method will continue processing the stream even after encountering errors, collecting + /// all successful items and all errors until the stream is exhausted. + /// + /// # Examples + /// + /// ``` + /// # use error_stack::{Report, Result}; + /// # use futures_util::stream; + /// # use error_stack_experimental::TryReportStreamExt; + /// + /// #[derive(Debug, Clone, PartialEq, Eq)] + /// pub struct UnknownError; + /// + /// impl core::fmt::Display for UnknownError { + /// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + /// f.write_str("UnknownError") + /// } + /// } + /// + /// impl core::error::Error for UnknownError {} + /// + /// # + /// # async fn example() { + /// let stream = stream::iter([ + /// Ok(1), + /// Err(Report::new(UnknownError)), + /// Ok(2), + /// Err(Report::new(UnknownError)), + /// ]); + /// + /// let result: Result, _> = stream.try_collect_reports().await; + /// let report = result.expect_err("should have failed twice"); + /// + /// assert_eq!(report.current_contexts().count(), 2); + /// # } + /// # + /// # tokio::runtime::Runtime::new().unwrap().block_on(example()); + /// ``` + fn try_collect_reports(self) -> TryCollectReports + where + A: Default + Extend, + Self: Sized, + { + TryCollectReports::new(self, None) + } + + /// Collects successful items from the stream into a collection, accumulating errors up to a + /// specified bound. + /// + /// This method will continue processing the stream after encountering errors, but will stop + /// once the number of accumulated errors reaches the specified `bound`. + /// + /// ``` + /// # use error_stack::{Report, Result}; + /// # use futures_util::stream; + /// # use error_stack_experimental::TryReportStreamExt; + /// + /// #[derive(Debug, Clone, PartialEq, Eq)] + /// pub struct UnknownError; + /// + /// impl core::fmt::Display for UnknownError { + /// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + /// f.write_str("UnknownError") + /// } + /// } + /// + /// impl core::error::Error for UnknownError {} + /// + /// # + /// # async fn example() { + /// let stream = stream::iter([ + /// Ok(1), + /// Err(Report::new(UnknownError)), + /// Ok(2), + /// Err(Report::new(UnknownError)), + /// ]); + /// + /// let result: Result, _> = stream.try_collect_reports_bounded(1).await; + /// let report = result.expect_err("should have failed twice"); + /// + /// assert_eq!(report.current_contexts().count(), 1); + /// # } + /// # + /// # tokio::runtime::Runtime::new().unwrap().block_on(example()); + /// ``` + fn try_collect_reports_bounded(self, bound: usize) -> TryCollectReports + where + A: Default + Extend, + Self: Sized, + { + TryCollectReports::new(self, Some(bound)) + } +} + +impl TryReportStreamExt for S where S: TryStream>> {} diff --git a/libs/error-stack/experimental/src/tuple.rs b/libs/error-stack/experimental/src/tuple.rs index c51494bbae0..6cf63727782 100644 --- a/libs/error-stack/experimental/src/tuple.rs +++ b/libs/error-stack/experimental/src/tuple.rs @@ -6,7 +6,7 @@ use error_stack::{Report, Result}; /// containing a tuple of the successful values, or an error if any of the results failed. /// /// The trait is implemented for tuples of up to 16 elements. -pub trait TupleExt { +pub trait TryReportTupleExt { /// The type of the successful output, typically a tuple of the inner types of the `Result`s. type Output; @@ -21,7 +21,7 @@ pub trait TupleExt { /// /// ``` /// use error_stack::{Report, Result}; - /// use error_stack_experimental::TupleExt; + /// use error_stack_experimental::TryReportTupleExt; /// /// #[derive(Debug)] /// struct CustomError; @@ -50,7 +50,7 @@ pub trait TupleExt { fn try_collect(self) -> Result; } -impl TupleExt for (core::result::Result,) +impl TryReportTupleExt for (core::result::Result,) where R: Into>, { @@ -91,10 +91,10 @@ macro_rules! all_the_tuples { macro_rules! impl_ext { ($([$type:ident, $output:ident]),+) => { #[doc(hidden)] - impl<$($type, $output),*, T, R, Context> TupleExt for ($($type),*, core::result::Result) + impl<$($type, $output),*, T, R, Context> TryReportTupleExt for ($($type),*, core::result::Result) where R: Into>, - ($($type,)*): TupleExt, + ($($type,)*): TryReportTupleExt, { type Output = ($($output),*, T); @@ -126,7 +126,7 @@ mod test { use error_stack::{Report, Result}; - use super::TupleExt; + use super::TryReportTupleExt; #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] struct TestError(usize);