Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal kotlin flow repeatUntil operator #2751

Open
jeziellago opened this issue Jun 8, 2021 · 9 comments
Open

Proposal kotlin flow repeatUntil operator #2751

jeziellago opened this issue Jun 8, 2021 · 9 comments
Labels

Comments

@jeziellago
Copy link

Hello folks 👋🏼

I have a pooling use-case and I'd like some operator for repeat operations.

My use-case

// returns Processing, Completed, or Failure
fun getTransformationStatus(): Flow<Message> { ... }
sealed class Message {
    object Processing : Message()
    object Completed : Message()
    object Failed : Message()
}

The Flow<Message> should complete with Completed or Failure status, otherwise must repeat (with delay() and some attempts).

We have the retry and retryWhen operators, but I don't have exceptions propagation to control my flow.

Proposal operator repeatUntil

Inspired by retryWhen operator, I created the extension repeatUntil.

fun <T> Flow<T>.repeatUntil(predicate: suspend (value: T, attempt: Long) -> Boolean): Flow<T> =
    flow {
        var attempt = 0L
        var shallRepeat: Boolean
        do {
            shallRepeat = false
            val value = this@repeatUntil.singleOrNull()
            if (value != null) {
                if (!predicate(value, attempt)) {
                    shallRepeat = true
                    attempt++
                } else {
                    emit(value)
                }
            }
        } while (shallRepeat)
    }

Using repeatUntil in my use-case:

getTransformationStatus()
    .repeatUntil { value, attempt ->
        if (value is Message.Completed ||
            value is Message.Failed ||
            attempt == MAX_ATTEMPT
        ) {
            true
        } else {
            delay(POOLING_INTERVAL)
            false
        }
    }.collect {
        println(it)
    }

It works well for me, but I'd like to know if there is a better way to do that.

@qwwdfsad qwwdfsad added the flow label Jun 8, 2021
@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jun 8, 2021

Thanks for the detailed write-up!
We are going to consider this along with #2092

Could you please elaborate on why do you need to re-collect the flow while it still has processing state instead of filtering them out and waiting for completed/failed?

@jeziellago
Copy link
Author

Thanks for the detailed write-up!
We are going to consider this along with #2092

Could you please elaborate on why do you need to re-collect the flow while it still has processing state instead of filtering them out and waiting for completed/failed?

Sure!
I have an HTTP endpoint with that behavior (something like HTTP long polling, maybe):

GET/status
[response]: Processing

---> wait some seconds and repeat

GET/status
[response]: Processing

---> wait some seconds and repeat

GET/status
[response]: Completed or Failed

// finish the job
// Each collect on `Flow<Message>` calls `GET/STATUS` endpoint.
fun getTransformationStatus(): Flow<Message> { ... }

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jun 8, 2021

Thanks!

Also, related to #1850

@jeziellago
Copy link
Author

@qwwdfsad Do you think the repeatUntil makes sense as a Flow operator or that behavior will be aggregated in something related to #2092 (instead of an operator)?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jun 9, 2021

It makes sense to consider them together because retryUntil is an essential replacement of retry for flows that been materialized, so this operator has to exist in some form.
It also has somewhat similar semantics to takeUntil, so they all should be considered together in order to provide a consistent solution with recognizable semantics and naming scheme

@jeziellago
Copy link
Author

It makes sense to consider them together because retryUntil is an essential replacement of retry for flows that been materialized, so this operator has to exist in some form.
It also has somewhat similar semantics to takeUntil, so they all should be considered together in order to provide a consistent solution with recognizable semantics and naming scheme

Thanks for clarifying 😁

@burnoo
Copy link

burnoo commented Sep 9, 2021

@jeziellago I had similar problem. I couldn't get your code to work though.

In your code you used signleOrNull which is termination operator. In my opinion this operator shouldn't terminate flow.

I did something similar in my project:

RetryMap Proposal

fun <T, R> Flow<T>.retryMap(
    map: suspend (T) -> R,
    predicate: suspend FlowCollector<R>.(item: T, result: R, attempt: Long) -> Boolean
): Flow<R> {
    
    tailrec suspend fun FlowCollector<R>.retryCollect(item: T, attempt: Long) {
        val result = map(item)
        if (predicate(item, result, attempt)) {
            retryCollect(item, attempt = attempt + 1)
        } else {
            emit(result)
        }
    }

    return flow {
        collect { item ->
            retryCollect(item, attempt = 1L)
        }
    }
}

Usage example

object Request

sealed class Result {
    object Success : Result()
    object Failure : Result()
    object Loading : Result()
}

suspend fun getResult(request: Request) : Result = TODO()

flowOf(Request)
    .retryMap(::getResult) { _, result, attempt ->
        result is Result.Failure && attempt <= 3
    }
    .onStart { emit(Result.Loading) }
    .collect { println(it) }

Advanced usage

Thanks to FlowCollector it is possible to emit items downstream, similar to retryWhen.
I'm using above code in my open source project, so you can check unit tests here: https://github.com/burnoo/compose-swr/blob/main/swr/src/test/kotlin/dev/burnoo/compose/swr/internal/flow/RetryMapTest.kt

@joffrey-bion
Copy link
Contributor

joffrey-bion commented Nov 22, 2021

In your code you used signleOrNull which is termination operator. In my opinion this operator shouldn't terminate flow.

Yeah, the initial problem statement by @jeziellago seems strange to me too, because it looks like getTransformationStatus should instead be a suspend function, and not returning a Flow (because it's expected to return only one element):

fun getTransformationStatus(): Flow<Message> { ... }

// should be
suspend fun getTransformationStatus(): Message { ... }

So it doesn't feel like a use case for a flow operator (maybe for a flow generator though)

@Andrew0000
Copy link

repeatUntil would be quite helpful and time saving for those who migrate from Rx.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants