Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.takeUntil(otherFlow) #1850

Open
rougsig opened this issue Mar 8, 2020 · 30 comments
Open

Flow.takeUntil(otherFlow) #1850

rougsig opened this issue Mar 8, 2020 · 30 comments

Comments

@rougsig
Copy link

rougsig commented Mar 8, 2020

I need analog for rxJava takeUntil operator. This logic can be found here

Will such an operator be added to the standard library?

@elizarov
Copy link
Contributor

What is your use-case for such an operator? Can you give an example on how you'd use it in your application?

@rougsig
Copy link
Author

rougsig commented Mar 10, 2020

Sure. I need to cancel the flow.

sealed class CatAction {
  object StartFetch: CatAction()
  object CancelFetch: CatAction()
  data class UpdateCats(val cats: List<Cat>): CatAction()
}

suspend fun fetchCats(): List<Cat> {
  return httpClient.fetchCats()
}

fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
  return actions
    .mapNotNull { it as? CatAction.FetchCat }
    .flatMapConcat { action ->
       flow {
         val cats = fetchCats()
         emit(UpdateCats(cats))
       }
       .takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch })
    }
}

@elizarov
Copy link
Contributor

This is a super-scary code, since you are collecting actions twice (independently) and back-to-back sequence of CatAction.FetchCat and CatAction.CancelFetch can get processed in any order. I would suggest to simply rewrite it like this:

fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
  return actions
    .mapNotNull { it as? CatAction.FetchCat }
    .flatMapLatest { action ->
       // this flow will get cancelled as soon as new action comes in
       flow {
         val cats = fetchCats()
         emit(UpdateCats(cats))
       }
    }
}

Will it work for you?

@rougsig
Copy link
Author

rougsig commented Mar 11, 2020

No. It's not work for me. I need to process only one CatAction.FetchCat per moment. In your code it will be cancel current request and start new.

So I need this behavior:
When the first CatAction.FetchCat is received, start the request.
When receiving new CatAction.FetchCat while the first CatAction.FetchCat is processing, ignore new CatAction.FetchCat.
When the CatAction.CancelFetch is received, cancel the request.

UPD:
when I wrote the usage example, I forgot to add filterNot under flatMapConcat for ignore new actions.
Fixed example:

sealed class CatAction {
  object StartFetch: CatAction()
  object CancelFetch: CatAction()
  data class UpdateCats(val cats: List<Cat>): CatAction()
}

data class CatState(
  val cats: List<Cat>? = null,
  val isLoading: Boolean = false
)

suspend fun fetchCats(): List<Cat> {
  return httpClient.fetchCats()
}

fun catFetcher(actions: Flow<CatAction>, state: StateProvider<CatState>): Flow<CatAction> {
  return actions
    .filterNot { state.isLoading }
    .mapNotNull { it as? CatAction.FetchCat }
    .flatMapConcat { action ->
       flow {
         val cats = fetchCats()
         emit(UpdateCats(cats))
       }
       .takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch })
    }
}

@elizarov
Copy link
Contributor

elizarov commented Mar 13, 2020

The example you've given still has the problem of collection actions twice and gives no guarantee on how back-to-bach FetchCat and CancelFetch pair is processed. I don't see how takeUntil could be used to write a reliable code for your specific use-case.

You use-case can be implemented with some kind of dropWhenBusy (see #1798) that would also take a filtering lambda to specify which events need to be dropped while downstream is busy, so that you can write it like this. I've also corrected the code to use transformLatest instead of flatMapLatest.

fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
  return actions
    .dropWhenBusy { it is CatAction.FetchCat } // ignore featch requests when working on prev
    .transformLatest { action ->
       // this block will get cancelled as soon as cancel action comes in
       if (action is CatAction.FetchCat) emit(UpdateCats(fetchCats()))
    }
}

@rougsig
Copy link
Author

rougsig commented Mar 13, 2020

Yeah. It can be implemented with dropWhenBusy.

I think dropWhenBusy + transformLatest in this case has a very non-obvious behavior. Due to the fact that we cancel not only for the CancelFetch, but for everything except the FetchCat.

fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
  return actions
    .dropWhenBusy { it is CatAction.FetchCat } // ignore featch requests when working on prev
    .filter { it is CatAction.FetchCat || it is CatAction.CancelFetch } // ignore actions  
    .transformLatest { action ->
       // this block will get cancelled as soon as cancel action comes in
       if (action is CatAction.FetchCat) emit(UpdateCats(fetchCats()))
    }
}

@elizarov
Copy link
Contributor

@rougsig What alternative would you suggest?

@rougsig
Copy link
Author

rougsig commented Mar 13, 2020

An operator that can cancel Flow on an event from another Flow. takeUntil(other Flow) cat do that logic.

In basic some rx conditional operators.

@elizarov
Copy link
Contributor

@rougsig Do you have a use-case for cancelling a flow on an event from another flow? The use-case we've been discussing so far was about cancelling a flow on an event from the same flow.

@rougsig
Copy link
Author

rougsig commented Mar 13, 2020

Sure:

sealed class LifeCycleAction {
  object Create: LifeCycleAction()
  object Destroy: LifeCycleAction()
}

sealed class CatAction {
  object StartFetch: CatAction()
  data class UpdateCats(val cats: List<Cat>): CatAction()
}

val lifeCycle: Channel<LifeCycleAction>
val requests: Channel<CatAction>
val cats: Flow<List<Cat>>

// all computation will be calculated on other context.
// in that reason we can not use job.cancel()
fun catFetcher(actions: Flow<CatAction>, lifeCycle: Flow<LifeCycleAction>): Flow<CatAction> {
   return actions
    .filterNot { state.isLoading }
    .mapNotNull { it as? CatAction.StartFetch }
    .flatMapConcat { action ->
       flow {
         val cats = fetchCats()
         emit(UpdateCats(cats))
       }
       .takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch })
    }
}

class Screen: CoroutineScope {
  fun onCreate() {
   launch { 
     lifeCycle.send(LifeCycleAction.Create)
     requests.send(CatAction.StartFetch)
   }
   launch {
    cats.collect { cats -> renderCats(cats) }
   }  
 }

 fun onDestroy() {
   launch { lifeCycle.send(LifeCycleAction.Destroy)  }
 }
}

But in my pet app i have one Flow with all application Actions.
So in that case i can cancel flow like this:

fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
  return actions
    .dropIf { it is CatAction.CancelFetch }
    .mapNotNull { it as? CatAction.FetchCat }
    .map { action ->
      emit(UpdateCats(fetchCats()))
    }
}

@elizarov
Copy link
Contributor

elizarov commented Mar 13, 2020

@rougsig I'm a bit lost. You're showing the same code that, in essence, does:

actions .... /* some transformation */
    .takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch }) 
//             ^^^^^^^^ uses _the same_ "actions" flow here

It does not work this way. You can get really weird results. Do you actually have a use-case where it comes from different flows?

@rougsig
Copy link
Author

rougsig commented Mar 13, 2020

No. In real it only one Flow.

@elizarov
Copy link
Contributor

elizarov commented Mar 13, 2020

Then you need some other solution to your problem that would not use takeUntil, but something else.

@rougsig
Copy link
Author

rougsig commented Mar 13, 2020

Tomorrow I will add comment with link to the simple Android application for this case.

@rougsig
Copy link
Author

rougsig commented Mar 13, 2020

I creating pet project with flow being inspired by https://redux-observable.js.org/
Recipe of cancellation uses takeUntil operator.

import { ajax } from 'rxjs/ajax';

const fetchUserEpic = action$ => action$.pipe(
  ofType(FETCH_USER),
  mergeMap(action => ajax.getJSON(`/api/users/${action.payload}`).pipe(
    map(response => fetchUserFulfilled(response)),
    takeUntil(action$.pipe(
      ofType(FETCH_USER_CANCELLED)
    ))
  ))
);

Why can I get really strange results? in this case:

actions .... /* some transformation */
    .takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch }) 
//             ^^^^^^^^ uses _the same_ "actions" flow here

@rougsig
Copy link
Author

rougsig commented Mar 18, 2020

So all the cases I had can be solved using transformLatest and combining the start and cancel events into one. Thank you.

@floschu
Copy link

floschu commented Mar 19, 2020

This would be a valid use-case in my opinion for takeUntil:

val cancelChannel: BroadCastChannel<Unit>
val intervalFlow: Flow<Unit> // emits every x TimeUnit

intervalFlow
    .takeUntil(cancelChannel.asFlow())
    .onEach { /* do something */ }
    .launchIn(someScope)

// later
cancelChannel.offer(Unit) // after this, intervalFlow should no longer emit

@elizarov
Copy link
Contributor

@floschu Where you might encounter a use-case like this? You can use structured concurrency for this kind of cancellation. Can you elaborate on your example, please.

@elizarov
Copy link
Contributor

Why can I get really strange results? in this case:
@rougsig Did you figure it out?

@floschu
Copy link

floschu commented Mar 20, 2020

@floschu Where you might encounter a use-case like this? You can use structured concurrency for this kind of cancellation. Can you elaborate on your example, please.

If the intervalFlow is not launched by me, I would not know how to cancel it via structured concurrency, maybe you could elaborate on this.
A more specific use-case for this would be if a Flow is launched in a library and I can only transform it:

library:

package com.library

abstract class Store<Action, Mutation, State> {

    protected val actions = BroadCastChannel<Action>(BUFFERED)
    open var mutator: (action: Action) -> Flow<Mutation> = { emptyFlow() }
    
    init {
        actions.asFlow()
            .flatMapMerge { action -> mutator(action) }
            .scan(initialState) { previousState, mutation -> reduce(previousState, mutation) }
            .onEach { /* publish state */ } 
            .launchIn(someScope)
    }
}

implementation:

package com.mine

enum class Action { LOAD }

class MyStore : Store<Action, Int, Int> {

    override var mutator =  { action ->
        when(action) {
            Action.LOAD -> flow {
                val value: Int = someSuspendingOperation()
                emit(value)
            }.takeUntil(actions.asFlow().filter { it is Action.LOAD })
        }
    }
}

In this case I want the created flow { ... } in MyStore.mutator to not emit its value when another Action.LOAD comes in.

@elizarov
Copy link
Contributor

@floschu Your latest example is not reliable. You are, again, collecting the same actions flow twice.

How to write it correctly? I don't have a ready answer. I'll need to know more about this kind of architecture to find a solution. Would Action.LOAD be the only action that flows over actions flow? If there are other kinds of actions in actions flow, then how does it work? Can you give a worked out example with multiple types of actions (if they are needed)?

@floschu
Copy link

floschu commented Apr 19, 2020

@floschu Your latest example is not reliable. You are, again, collecting the same actions flow twice.

How to write it correctly? I don't have a ready answer. I'll need to know more about this kind of architecture to find a solution. Would Action.LOAD be the only action that flows over actions flow? If there are other kinds of actions in actions flow, then how does it work? Can you give a worked out example with multiple types of actions (if they are needed)?

I do not see how an example with multiple actions would make you understand the architecture better 😅

I have an elaborate example for you, but it would be a bit of a bigger read I guess:

In general I need a way to cancel the Flow that is created in the ControllerImplementation.mutator. Could be with the same Action, could be with another Action or could be with another Flow entirely (maybe there is a global Flow somewhere outside of the Controller that needs to cancel the mutation).

In RxJava I would have used the .takeUntil(Observable) operator but not sure how to implement it best with coroutines.

@RoryKelly
Copy link

RoryKelly commented Apr 29, 2020

@elizarov I this operator is most useful when a flow is used to represent a UI event.

// timer is a flow that emits every second, endlessly
val timer = timerFlow()
 runButton.clicks()
                 .flatmap(timer.takeUntil(stopButton.clicks()))
                 .onEach {
                    ... update ui ....
                }
                .launchIn(this)

this is a common use-case in RXjava (see rxbindings) and it is already been replicated with co-routines (see flow bindings)

@elizarov
Copy link
Contributor

elizarov commented May 8, 2020

@RoryKelly timerFlow-like thing is in our backlog for a while. Please, follow #540 for details. However, the timer.takeUntil(someEvent) is an interesting pattern, indeed.

The example you've posted is relatively safe, since it actually combines three different flows (runButton, stopButton, timer), but is still conceptually prone to the same asynchronous reordering issue (click on "run" button followed by a fast click on "stop" button can leave the UI running and ticking). It should work correctly in UI thread (but no hard guarantees even there) and it can badly backfire when combined with flowOn and background threads.

@RoryKelly
Copy link

RoryKelly commented May 11, 2020

@elizarov I think the threading issues caused by flowOn on are generally managed by the implementation of clicks.

@CheckResult
@UseExperimental(ExperimentalCoroutinesApi::class)
fun View.clicks(): Flow<Unit> = callbackFlow {
    checkMainThread()
    val listener = View.OnClickListener {
        safeOffer(Unit)
    }
    setOnClickListener(listener)
    awaitClose { setOnClickListener(null) }
}.conflate()

The coroutine would throw when the flowOn operator is used incorrectly and its up to the user to implement error handling.

In other stream implementations, the Ux problem you outlined click on "run" button followed by a fast click on "stop" button can leave the UI running and ticking are handled by operators like share/cache/refcount etc.

I think takeUntil would allow the use of flows in the UI layer/simplify complex flow, a lot of the community is using the approach, in the rxjava world and I can see coroutines moving in that direction. see https://github.com/freeletics/CoRedux and https://github.com/freeletics/RxRedux

@BenTilbrook
Copy link

BenTilbrook commented Mar 16, 2021

My use case is I'd like to collect flow A only while some condition is true represented by flow B. A takeUntil(Flow) operator is typically what I use to implement (part of) this.

A real world example is collecting location data only while the user has granted permission to do so.

@hoc081098
Copy link

I need this operator. It is useful for cancel running request when clicking a button

fetchFlow.flatMapLatest { request().takeUntil(cancelFlow) }

@hoc081098
Copy link

hoc081098 commented Jun 3, 2021

I have added it in https://github.com/hoc081098/FlowExt library

Marble

----------a-----b------c-----|--
--------------a--------
----------a---|

----------a-----b------c-----|--
-----------------------------------a--------
----------a-----b------c-----|--
@ExperimentalCoroutinesApi
fun <T, R> Flow<T>.takeUntil(notifier: Flow<R>): Flow<T> = channelFlow {
  val outerScope = this

  launch {
    try {
      notifier.take(1).collect()
      close()
    } catch (e: CancellationException) {
      outerScope.cancel(e) // cancel outer scope on cancellation exception, too
    }
  }
  launch {
    try {
      collect { send(it) }
      close()
    } catch (e: CancellationException) {
      outerScope.cancel(e) // cancel outer scope on cancellation exception, too
    }
  }
}

@BraisGabin
Copy link

BraisGabin commented Jun 3, 2021

@hoc081098 thank you so much! I was trying to do this yesterday and I didn't get it right. I think that your snippet is missing one close() after the second collect. Otherwise the generated flow doesn't complete as soon as the upper flow completes.

@hoc081098
Copy link

@hoc081098 thank you so much! I was trying to do this yesterday and I didn't get it right. I think that your snippet is missing one close() after the second collect. Otherwise the generated flow doesn't complete as soon as the upper flow completes.

That's right. Thanks @BraisGabin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants