Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream's timeout doesn't interrupt asynchronous generator functions. #59856

Open
talesbarreto opened this issue Jan 7, 2025 · 3 comments
Open
Assignees
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries.

Comments

@talesbarreto
Copy link

In a private project that I work on, we are listening to new data from a server through a stream. If no data comes within a few seconds, we need to abort the operation and show an error message to the user.

To achieve this, we tried to use the Stream's timeout function but, to our surprise, no exception is thrown. If the server doesn't emit any new data, the operation gets stuck.

An example code that reproduces this behavior:

Stream<int> _streamFromGenerator() async* {
  await Future.delayed(Duration(days: 1));
}

Future<void> main() async {
  final secondsWithTimeOut = _streamFromGenerator().timeout(
    Duration(seconds: 2),
  );
  try {
    await for (final event in secondsWithTimeOut) {
      print("Yielded: $event");
    }
  } catch (e) {
    print("Error: $e");
  }
}

The timeout will be ignored and this code will take one day to complete. In our case, it could take forever.

It turned out that a timeout works after the daley is completed. If we change the previous code to:

Stream<int> _streamFromGenerator() async* {
  yield 1;
  await Future.delayed(Duration(seconds: 4));
  print("after delay");
  yield 2;
}

Future<void> main() async {
  final secondsWithTimeOut = _streamFromGenerator().timeout(
    Duration(seconds: 2),
  );
  try {
    await for (final event in secondsWithTimeOut) {
      print("Yielded: $event");
    }
  } catch (e) {
    print("Error: $e");
  }
}

The output will be:

Yielded: 1
after delay
Error: TimeoutException after 0:00:02.000000: No stream event

Process finished with exit code 0

Note that this exception will only be thrown after 4 seconds.

We managed to implement the expected behavior using StreamController instead of a generator. Still, I think that it is a bug or something that can lead developers to errors.

@talesbarreto talesbarreto changed the title Stream's timeout doesn't doesn't interrupt asynchronous generator functions. Stream's timeout doesn't interrupt asynchronous generator functions. Jan 7, 2025
@jakemac53 jakemac53 transferred this issue from dart-lang/core Jan 7, 2025
@jakemac53 jakemac53 added the area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. label Jan 7, 2025
@jakemac53
Copy link
Contributor

Moved this to the SDK, although its possible it is actually a language issue. Not sure if the spec specifies this behavior.

@jakemac53 jakemac53 assigned jakemac53 and lrhn and unassigned jakemac53 Jan 7, 2025
@lrhn
Copy link
Member

lrhn commented Jan 8, 2025

This is specified behavior.

The Stream.timeout operation cancels the stream subscription before sending the TimeoutException error event.
Cancelling an async* stream subscription won't complete until the function body exits, and the only change the cancellation does too that is too make yield behave like return.

The cancel call will wait for the one-day Future to complete, because there is no yield executed before that.
That's how long it takes to call cancel on that stream.

It's a little anyone that a cancel can't even interrupt an await for, but it's too unsafe to interrupt user code in unpredictable places. If your async* steam code is hanging at an await of any kind, then cancelling don't do anything until the await completes, because control can't exit the function body when it's not moving.

A thing we could do is to give timeout a flag that makes it not wait for the cancel to complete. That would mean that the cancel throwing an error will become an uncaught async error that crashes the program. So maybe the option should be a Future<void>? Function(Future<void>)? cancelHandler that allows you to intercept the cancel future, and maybe return null instead of the timeout shouldn't wait. The default value is then just an identity function.

(But changing the API of Stream isn't trivial, so it might have to be an extension function instead of changing the current timeout.)

@talesbarreto
Copy link
Author

talesbarreto commented Jan 8, 2025

Hi @lrhn,

Thank you very much for taking the time to answer me. It makes total sense: we should not interrupt the code.

I think the main problem here is: if we are consuming a stream and need to cancel it, we need to know if this stream comes from a StreamController or from an async*. Both have the same interface but produce different results. If this stream comes from an external package, for example, things become even more problematic.


As you suggested, we could have something like:

extension CancellableStream<T> on Stream<T> {
  Stream<T> toCancellable() {
    final controller = StreamController<T>();
    final subscription = listen(
      controller.add,
      onError: controller.addError,
      onDone: controller.close,
      cancelOnError: true,
    );
    controller.onCancel = () {
      subscription.cancel();
      controller.close();
    };
    return controller.stream;
  }
}

But yet, not intuitive for those who are not aware of what the problem is.


Maybe the best solution is to have something similar to Kotlin's isActive. The user could check if the stream was canceled in the async* body, something like:

Stream<String> _streamFromGenerator() async* {
  for (;;) {
    final r1 = await expensiveOperation1();
    if (!isActive) {
      break;
    }
    final r2 = await expensiveOperation2();
    if (!isActive) {
      break;
    }
    yield "Result: $r1, $r2";
  }
}

When the subscription is canceled, the stream is "detached" from the async*. In this case, the timeout throws the exception in the correct time and the async* body would keep running.


At this point, not sure what would be the best intervention. I think it is, at least, worth mentioning this async* behavior in the timeOut documentation.

Feel free to close this issue if you believe it is appropriate to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries.
Projects
None yet
Development

No branches or pull requests

3 participants