Skip to content

Commit

Permalink
feat: stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
indietyp committed Sep 17, 2024
1 parent caef170 commit ce8560d
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 28 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ async-scoped = { version = "=0.9.0", default-features = false }
async-trait = { version = "=0.1.82", default-features = false }
aws-config = { version = "=1.5.6" }
aws-sdk-s3 = { version = "=1.49.0", default-features = false }
base64 = { version = "=0.22.1", default-features = false }
bitvec = { version = "=1.0.1", default-features = false }
bytes-utils = { version = "=0.1.4", default-features = false }
chrono = { version = "=0.4.38", default-features = false }
clap = { version = "=4.5.17", features = ["std", "color", "help", "usage", "error-context", "suggestions"] }
clap_complete = { version = "=4.5.26", default-features = false }
coverage-helper = { version = "=0.2.2", default-features = false }
Expand All @@ -150,7 +148,6 @@ dotenv-flow = { version = "=0.16.2", default-features = false }
expect-test = { version = "=1.5.0", default-features = false }
futures = { version = "=0.3.30", default-features = false }
hifijson = { version = "=0.2.2", default-features = false }
http-body-util = { version = "=0.1.2", default-features = false }
humansize = { version = "=2.1.3", default-features = false }
hyper = { version = "=1.4.1", default-features = false }
include_dir = { version = "=0.7.4", default-features = false }
Expand Down Expand Up @@ -183,19 +180,13 @@ rustc_version = { version = "=0.4.1", default-features = false }
scc = { version = "=2.1.17", default-features = false }
sentry = { version = "=0.34.0", default-features = false, features = ["backtrace", "contexts", "debug-images", "panic", "reqwest", "rustls", "tracing", "tower-http"] }
seq-macro = { version = "=0.3.5", default-features = false }
serde-value = { version = "=0.7.0", default-features = false }
serde_plain = { version = "=1.0.2", default-features = false }
serde_with = { version = "=3.9.0", default-features = false }
similar-asserts = { version = "=1.6.0", default-features = false }
spin = { version = "=0.9", default-features = false }
supports-color = { version = "=3.0.1", default-features = false }
supports-unicode = { version = "=3.0.0", default-features = false }
sval = { version = "=2.13.1", default-features = false }
syn = { version = "=2.0.77", default-features = false }
tachyonix = { version = "=0.3.1", default-features = false }
tarpc = { version = "=0.33", default-features = false }
temporal-io-client = { git = "https://github.com/temporalio/sdk-core", rev = "7e3c23f" }
temporal-io-sdk-core-protos = { git = "https://github.com/temporalio/sdk-core", rev = "7e3c23f" }
test-fuzz = { version = "=6.0.0", default-features = false }
test-log = { version = "=0.2.16", default-features = false }
test-strategy = { version = "=0.4.0", default-features = false }
Expand Down
9 changes: 9 additions & 0 deletions libs/error-stack/experimental/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@ publish = false
[dependencies]
# Public workspace dependencies
error-stack = { path = "..", public = true }
futures-core = { workspace = true, public = true, optional = true }
pin-project-lite = { workspace = true, optional = true }

[build-dependencies]
rustc_version = "0.4.1"

[lints]
workspace = true

[features]
stream = ["dep:futures-core", "dep:pin-project-lite"]

[dev-dependencies]
futures-util.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }

[package.metadata.docs.rs]
all-features = true
cargo-args = ["-Z", "unstable-options", "-Z", "rustdoc-scrape-examples"]
Expand Down
24 changes: 12 additions & 12 deletions libs/error-stack/experimental/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ where
///
/// This trait provides additional functionality to iterators that yield `Result` items,
/// allowing them to be collected into a container while propagating any errors encountered.
pub trait IteratorExt<C> {
pub trait TryReportIteratorExt<C> {
/// The type of the successful items in the iterator.
type Output;
type Ok;

/// Collects the successful items from the iterator into a container, or returns all errors that
/// occured.
///
/// This method attempts to collect all successful items from the iterator into the specified
/// container type. If an error is encountered during iteration, the method immediately returns
/// that error, discarding any previously collected items.
/// container type. If an error is encountered during iteration, the method will exhaust the
/// iterator and return a `Report` containing all errors encountered.
///
/// # Errors
///
Expand All @@ -106,7 +106,7 @@ pub trait IteratorExt<C> {
/// ```
/// use error_stack::{Result, ResultExt, Report};
/// use std::io;
/// use error_stack_experimental::IteratorExt;
/// use error_stack_experimental::TryReportIteratorExt;
///
/// fn fetch_fail() -> Result<u8, io::Error> {
/// # stringify! {
Expand All @@ -123,7 +123,7 @@ pub trait IteratorExt<C> {
/// ```
fn try_collect_reports<A>(self) -> Result<A, [C]>
where
A: FromIterator<Self::Output>;
A: FromIterator<Self::Ok>;

/// Collects the successful items from the iterator into a container or returns all errors up to
/// the specified bound.
Expand All @@ -142,7 +142,7 @@ pub trait IteratorExt<C> {
/// ```
/// use error_stack::{Result, ResultExt, Report};
/// use std::io;
/// use error_stack_experimental::IteratorExt;
/// use error_stack_experimental::TryReportIteratorExt;
///
/// fn fetch_fail() -> Result<u8, io::Error> {
/// # stringify! {
Expand All @@ -159,27 +159,27 @@ pub trait IteratorExt<C> {
/// ```
fn try_collect_reports_bounded<A>(self, bound: usize) -> Result<A, [C]>
where
A: FromIterator<Self::Output>;
A: FromIterator<Self::Ok>;
}

impl<T, C, R, I> IteratorExt<C> for I
impl<T, C, R, I> TryReportIteratorExt<C> for I
where
I: Iterator<Item = core::result::Result<T, R>>,
R: Into<Report<[C]>>,
C: Context,
{
type Output = T;
type Ok = T;

fn try_collect_reports<A>(self) -> Result<A, [C]>
where
A: FromIterator<Self::Output>,
A: FromIterator<Self::Ok>,
{
try_process_reports(self, None, |shunt| shunt.collect())
}

fn try_collect_reports_bounded<A>(self, bound: usize) -> Result<A, [C]>
where
A: FromIterator<Self::Output>,
A: FromIterator<Self::Ok>,
{
try_process_reports(self, Some(bound), |shunt| shunt.collect())
}
Expand Down
6 changes: 5 additions & 1 deletion libs/error-stack/experimental/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#![doc = include_str!("../README.md")]
#![cfg_attr(all(doc, nightly), feature(doc_auto_cfg))]

pub use self::{iter::IteratorExt, result::ResultMultiExt, tuple::TupleExt};
#[cfg(feature = "stream")]
pub use self::stream::TryReportStreamExt;
pub use self::{iter::TryReportIteratorExt, result::ResultMultiExt, tuple::TryReportTupleExt};

mod iter;
mod result;
#[cfg(feature = "stream")]
mod stream;
mod tuple;
193 changes: 193 additions & 0 deletions libs/error-stack/experimental/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use core::{
future::Future,
mem,
pin::Pin,
task::{ready, Context, Poll},
};

use error_stack::{Report, Result};

Check failure

Code scanning / clippy

unresolved imports error_stack::Report, error_stack::Result Error

unresolved imports error\_stack::Report, error\_stack::Result

Check failure

Code scanning / clippy

unresolved imports error_stack::Report, error_stack::Result Error

unresolved imports error\_stack::Report, error\_stack::Result
use futures_core::{FusedFuture, FusedStream, TryStream};
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`try_collect_reports`](TryReportStreamExt::try_collect_reports)
/// and [`try_collect_reports_bounded`](TryReportStreamExt::try_collect_reports_bounded) methods.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryCollectReports<S, A, C> {
#[pin]
stream: S,
output: Result<A, [C]>,

residual_len: usize,
residual_bound: usize
}
}

impl<S, A, C> TryCollectReports<S, A, C>
where
S: TryStream,
A: Default + Extend<S::Ok>,
{
fn new(stream: S, bound: Option<usize>) -> Self {
Self {
stream,
output: Ok(Default::default()),
residual_len: 0,
residual_bound: bound.unwrap_or(usize::MAX),
}
}
}

impl<S, A, C> FusedFuture for TryCollectReports<S, A, C>
where
S: TryStream<Error: Into<Report<[C]>>> + FusedStream,
A: Default + Extend<S::Ok>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<S, A, C> Future for TryCollectReports<S, A, C>
where
S: TryStream<Error: Into<Report<[C]>>>,
A: Default + Extend<S::Ok>,
{
type Output = Result<A, [C]>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

let value = loop {
if *this.residual_len >= *this.residual_bound {
break mem::replace(this.output, Ok(A::default()));
}

let next = ready!(this.stream.as_mut().try_poll_next(cx));
match (next, &mut *this.output) {
(Some(Ok(value)), Ok(output)) => {
output.extend(core::iter::once(value));
}
(Some(Ok(_)), Err(_)) => {
// we're now just consuming the iterator to return all related errors
// so we can just ignore the output
}
(Some(Err(residual)), output @ Ok(_)) => {
*output = Err(residual.into());
*this.residual_len += 1;
}
(Some(Err(residual)), Err(report)) => {
report.append(residual.into());
*this.residual_len += 1;
}
(None, output) => {
break mem::replace(output, Ok(A::default()));
}
}
};

Poll::Ready(value)
}
}

/// Trait extending `TryStream` with methods for collecting error-stack results in a fail-slow
/// manner.
pub trait TryReportStreamExt<C>: TryStream<Error: Into<Report<[C]>>> {
/// Collects all successful items from the stream into a collection, accumulating all errors.
///
/// This method will continue processing the stream even after encountering errors, collecting
/// all successful items and all errors until the stream is exhausted.
///
/// # Examples
///
/// ```
/// # use error_stack::{Report, Result};
/// # use futures_util::stream;
/// # use error_stack_experimental::TryReportStreamExt;
///
/// #[derive(Debug, Clone, PartialEq, Eq)]
/// pub struct UnknownError;
///
/// impl core::fmt::Display for UnknownError {
/// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
/// f.write_str("UnknownError")
/// }
/// }
///
/// impl core::error::Error for UnknownError {}
///
/// #
/// # async fn example() {
/// let stream = stream::iter([
/// Ok(1),
/// Err(Report::new(UnknownError)),
/// Ok(2),
/// Err(Report::new(UnknownError)),
/// ]);
///
/// let result: Result<Vec<i32>, _> = stream.try_collect_reports().await;
/// let report = result.expect_err("should have failed twice");
///
/// assert_eq!(report.current_contexts().count(), 2);
/// # }
/// #
/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
/// ```
fn try_collect_reports<A>(self) -> TryCollectReports<Self, A, C>
where
A: Default + Extend<Self::Ok>,
Self: Sized,
{
TryCollectReports::new(self, None)
}

/// Collects successful items from the stream into a collection, accumulating errors up to a
/// specified bound.
///
/// This method will continue processing the stream after encountering errors, but will stop
/// once the number of accumulated errors reaches the specified `bound`.
///
/// ```
/// # use error_stack::{Report, Result};
/// # use futures_util::stream;
/// # use error_stack_experimental::TryReportStreamExt;
///
/// #[derive(Debug, Clone, PartialEq, Eq)]
/// pub struct UnknownError;
///
/// impl core::fmt::Display for UnknownError {
/// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
/// f.write_str("UnknownError")
/// }
/// }
///
/// impl core::error::Error for UnknownError {}
///
/// #
/// # async fn example() {
/// let stream = stream::iter([
/// Ok(1),
/// Err(Report::new(UnknownError)),
/// Ok(2),
/// Err(Report::new(UnknownError)),
/// ]);
///
/// let result: Result<Vec<i32>, _> = stream.try_collect_reports_bounded(1).await;
/// let report = result.expect_err("should have failed twice");
///
/// assert_eq!(report.current_contexts().count(), 1);
/// # }
/// #
/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
/// ```
fn try_collect_reports_bounded<A>(self, bound: usize) -> TryCollectReports<Self, A, C>
where
A: Default + Extend<Self::Ok>,
Self: Sized,
{
TryCollectReports::new(self, Some(bound))
}
}

impl<S, C> TryReportStreamExt<C> for S where S: TryStream<Error: Into<Report<[C]>>> {}
Loading

0 comments on commit ce8560d

Please sign in to comment.