Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.chunked with specified time period #1302

Open
gmarchelos opened this issue Jun 27, 2019 · 44 comments
Open

Flow.chunked with specified time period #1302

gmarchelos opened this issue Jun 27, 2019 · 44 comments

Comments

@gmarchelos
Copy link

gmarchelos commented Jun 27, 2019

Currently Flow supports only buffer operator with capacity.
It would be useful to buffer elements within specified time range.
flow.buffer(Duration.ofSeconds(5)).collect {...}

@qwwdfsad qwwdfsad added the flow label Jun 28, 2019
@qwwdfsad
Copy link
Collaborator

Could you please explain a bit your use-case? E.g. what kind of domain-specific problem you would like to solve with such operator

@qwwdfsad qwwdfsad reopened this Jun 28, 2019
@fvasco
Copy link
Contributor

fvasco commented Jun 28, 2019

I need to batch live events to reduce synchronization cost and limit refresh rate.

Currently I implemented this feature in my project: events are sent to a channel and batched to 10Hz in an event bus.

@gmarchelos
Copy link
Author

I have flow of trades and I need to collect trades within specified timespan (it's high probability for many trades to occur within 1-5 sec) and then do some processing of collected trades.
It's costly for me to process trades one by one so I would like to have buffer operator that collects events within specified timespan.

@elizarov
Copy link
Contributor

elizarov commented Jul 1, 2019

Thanks. This kind of operator would be called chunked in Kotlin lingo. Thanks for spelling out a use-case with a time-limit chunking. See #1290 for a size-limited use-case.

@elizarov elizarov changed the title Flow buffer operator that buffers elements within specified time range Flow.chunked with specified time period Jul 1, 2019
@PaulWoitaschek
Copy link
Contributor

I'm in need of this too. My use case is as following:

I implemented a barcode scanner which scans the camera input live. That means that I get barcode results every few milliseconds.
However the barcode implementation is not perfect and sometimes detects a wrong barcode.
For that case I want consume the barcode events chunked in 2 second windows and pick the barcode that was detected most frequently.

@ps-feng
Copy link

ps-feng commented Jul 16, 2020

 @PaulWoitaschek I had a requirement for a similar (though not exactly the same operator) and ended up implementing my own (final implementation at the end), if it helps.

@circusmagnus
Copy link

circusmagnus commented Nov 11, 2020

I have a need for a time based chunked too. I have a stream of analytics events originating in an mobile app. I would like to chunk them in time-based windows, so that I can send them to the backend periodically without initiating too many network connections. Say, chunk events for a minute, send them over the network and repeat for the next package.

For this task I care for:

  • chunks that are not empty. No events = no chunks, despite time window getting closed.
  • not losing events
  • perhaps ability to limit max size of a chunk, to not blow up memory in some corner cases

I do not care for:

  • exact timing of chunks emissions.

I had a discussion earlier with @elizarov about size-based chunk operator:
#1558 (comment)

So a single operator for time- and size-based chunks is desired. So it is possible to chunk by size, by time and by both.

I would like to propose a base operator with three parameters:
public fun <T> Flow<T>.chunked(chunkDuration: Duration, minSize: Int, maxSize: Int): Flow<List<T>>

  • chunkDuration: For how long shall the accumulating of single chunk happen. If set to 0, this could mean no-time limit

  • maxSize: Maximum size of a chunk. If chunk reaches maximum size, it should be emitted before time-limit is reached. In such case, I think it would intuitive, that "chunking timer" is reset when this happens. Would default to NO_MAXIMUM special value.

  • minSize: For my use case, I would not like to receive empty chunks. I would even like to specify a minimum size of about 10. If chunk does not reach this size in specified time, I am ok with values not being emitted in "current chunk" but added in front of a next chunk. But perhaps in other use case it might be convenient to receive chunks, even being empty, but at regular intervals. So that minSize = 0 could be specified in such case. Min size could be used, when deciding, what to do, when source flow completes - we need to decide, whether we need to emit a "partial", not fully filled chunk as a last emission. In such case, size of a last chunk could be checked, whether it fulfills minSize parameter. Would default to 1 - I think that most common use case is a need to collect multiple values for some batch processing and empty chunks are not desirable.

Important consideration is how exactly time-based chunking should be implemented. Should "chunking" start right at the moment of collection, or only after receiving first value? Should new "chunking window" start right after previous window, or only after receiving first value for a new chunk?

Based on my needs, if I want chunks to not be empty, that means I am ok with emissions not happening in regular intervals. In fact I want chunking to gather as much values as possible in a time frame. So I would propose, that if minSize > 0, then chunking should start only after receiving first value for a new chunk. That way, we won`t be "chunking air". Only minSize = 0 could mean, that we want regular chunk emissions, no matter the size -> chunking should start right at the moment of collection and new chunking window should follow right after previous.

Above may not be very intuitive however. Maybe we should always start time-based chunking right after collection and let one chunking window follow another. Chunks not fulfilling minSize would just be added to the following chunks and so on until the subsequent chunk fulfills minSize param.

As for exact API shape, I think we could have one operator with @experimental Duration param and another with just chunkDurationMs: Long -> to not tie this operator experimentality to kotlin.time.

I also thought, that for just size-based chunking it would still be convenient to have a separate operator without Duration param. Otherwise, with chunked(chunkDuration: Duration, minSize: Int, maxSize: Int) it is IMHO too easy to specify duration and maxSize to 0 and end up with runtime validation error. Also from this signature it is not evident, that Duration == 0 is possible, if we want only size-based chunking.

I have also decided to omit trailing lambda { chunk: List<T> -> userFunction(chunk) } specifying what to do with a chunk after its emission -> to not emit a list if something more specific is needed dwonstream. I think that
flow.chunked(10.seconds).map { chunk -> ... } should be enough.

To sum up:

public fun Flow.chunked(maxSize: Int, minSize: Int = 1): Flow<List>

public fun Flow.chunked(chunkDuration: Duration, minSize: Int = 1, maxSize: Int = NO_MAXIMUM ): Flow<List>

public fun Flow.chunked(chunkDurationMs: Long, minSize: Int = 1, maxSize: Int = NO_MAXIMUM): Flow<List>

@pacher
Copy link

pacher commented Nov 12, 2020

Please remember to consider natural batching use-case (#902) in the design.
How to express "emit chunk always immediately as soon as possible" in the proposed API?
Basically to buffer upstream emissions while collector is busy with the previous chunk and emit a chunk as soon as collector is ready to receive.

@circusmagnus
Copy link

Good point @pacher

To express this, yet another, use case in single operator something more expressive about is needed I think. A way, to precise what needs to happen for a chunk to be emitted. How about:

chunk(emitWhen: ChunkConstraint)
where

sealed class ChunkConstraint {
   class BySize(val size: Int) : ChunkConstraint() 
   class ByTime(val timeLimit: Duration, val minSize: Int = 1, val maxSize: Int = NO_MAXIMUM) : ChunkConstraint()
   class UntilCollectorIsReady(val minSize: Int = 1, val maxSize: Int = NO_MAXIMUM): ChunkConstraint()
}

..expressing three most popular use cases? Do you think that last case CollectorIsReady needs max size customization option? Perhaps someone would like to have a time constraint there too?

One thing I regret, that casual user will no longer be able to type chunk(10.seconds) and be done with it, like in RxJava.
But chunk(ByTime(10.seconds)) looks pretty good too.

@pacher
Copy link

pacher commented Nov 13, 2020

I was more thinking along the lines of what @elizarov suggested in this comment: duration is set to zero and size limit is set to very big value
I think duration of 0 reflects more the natural batching case and there should be a special value for no time limit similar to suggested NO_MAXIMUM for size.
Then you can do chunk(10.seconds).
Max size does not make sense with 0 duration, but min size does and could be used. Upd.: After some thought, natural batching and size limits are kind of in contradiction.

P.S. I, personally, would add an optional parameter like batch=true to the existing buffer operator or even introduce new batch operator (example) because I am worried that fitting single implementation of chunked to be used for natural batching might result in not the most optimal implementation of the latter.

@pacher
Copy link

pacher commented Nov 13, 2020

Do you think that last case CollectorIsReady needs max size customization option? Perhaps someone would like to have a time constraint there too?

Not really.
Time constraints are direct contradiction: which one is it? Should I emit as soon as possible, meaning now, or wait a duration?
Restricting min size would be the same as using chunked with min size alone without any time limits.
Max size: when max size is reached we can't emit anyway because the collector is busy. Perhaps there is a use case here to suspend upstream on max size. Otherwise I can hardly imaging that somebody would implement a batch processing with the upper bound on the batch size. Maybe, but nothing comes to my mind.

All in all, duration 0 for natural batching is a clear winner for API design in my opinion. ChunkConstraint is heavy and is not needed yet.

@circusmagnus
Copy link

circusmagnus commented Nov 15, 2020

  • Duration equal to zero for natural batching is IMHO problematic, because our natural batching is not going to work like standard flow operator. chunk(duration = 0, maxSize=10) and chunk(duration = 10.seconds, maxSize = NO_MAXIMUM) both do not care about how fast downstream processing is. If collector cannot keep up, they will suspend producer. As most other operators do (one can change it introducing buffer operator upstream and that is very good separation of concerns IMHO). Then why suddenly chunk(duration = 0, maxSize = NO_MAXIMUM) should out of the blue do not suspend upstream? Why chunk(duration = 0, maxSize = 100_000_000) would wait for those 100_000_000 emissions, while chunk(duration = 0, maxSize = NO_MAXIMUM) should emit as soon as collector is ready (potentially much sooner than reaching 100 mln size)?

Trying to fit natural batching into some magic combination of duration and maxSize of a chunk is IMHO not going to be intuitive at all.

  • MaxSize makes sense for natural batching - as a safety valve. It is better to suspend upstream than blow up on OOM IMHO. But then, such maxSize means kind of different thing, than in standard chunking. We are not going to wait for maxSize in order to emit. We are waiting for maxSize to effectively suspend upstream until collector is ready again.

That is why I tried to come up with a sealed class, clearly telegraphing possible chunking behaviours. SizeLimit chunking does not need minSize or maxSize, but just size. While natural batching needs maxSize(I think), but it means a different thing, than in TimeLimitChunking (and should be named differently I guess).

All in all I am seeing, that it is not easy to fit natural batching in here. Maybe instead of trying to do it, we could come up with something more general / abstract, similar to Flow.transform { ... }

chunk(nullableInterval) { ticker, downStream ->
   // can construct own buffer / store last value / whatever
   // can start/stop timer as needed
  // can emit when needed
   onNewValue { newValue -> ... }
   onTimerTick {  ...}
   onCollectorReady { ... }
} 

kind of like select expression. You could freely omit / start / stop timer and use / ignore onCollectorReady Clause. And most importantly emit whenever and whatever you wish.

Edit: Simplified signature. But boy, that is much more complicated than simple chunk(10.seconds). Still time based chunking could be implemented by:

timeChunk(interval) = chunk(interval) { downstream, ticker ->
   val acc = mutableListOf()
   ticker.start()
   onNewValue { newValue -> acc.add(value) }
   onTimerTick {  downstream.emit(acc.toList()); acc.clear() }
}

@pacher
Copy link

pacher commented Nov 16, 2020

@circusmagnus I am waiting for clarification here before discussing it further.

@elizarov
Copy link
Contributor

Before we delve into the design we need to need to gather and discuss use-cases.

@circusmagnus has great use-cases which call for maxSize and duration chunk constraints. I am not sure I understand the motivation for minSize, though. The whole minSize story opens too many questions (does time start counting only when minSize is filled, etc). Can you live without it?

@circusmagnus
Copy link

circusmagnus commented Nov 24, 2020

Well, I think all three use cases:

  • I want to collect my emissions into fixed size packages for efficient and predictable batch processing - like determining an average value. (Sub-case? Windowing and diffing betwen emissions, as spelled by me in Flow.chunked operator with size limit #1290 )
  • I want to cache my emissions for some time and emit them as packages downstream to avoid doing something downstream too often - like initiating network connection.
  • I want to collect my emissions into larger packages in case downstream cannot keep up / for efficient and fast batch processing.

... can live without minSize. It had two motivations:

  • whether we want to have 0 sized chunks in time-based chunking or no empty chunks. I think no, noone said, he wants 0-sized chunks. Or we could just emit 0-sized chunks and let user filter them out in separate operator.
  • concatenate chunks if they are too small in time-based or natural-batching cases. But that can be done in separate operator (transform) after all.

After a few days I think @pacher may be right. Let it be:
chunk(size = NO_CONSTRAINT, duration = 0)

with usage:
chunk() - means natural batching with default upper limit. Emit package ASAP but wait for at least one element. Hold a default buffer for elements (max size = 64). Suspend upstream, when buffer fills up.
buffer(1024).chunk() - means natural batching with buffer enlarged to 1024 elements. -
buffer(UNLIMITED).chunk() - means natural batching with no upper limit
chunk(10)- means emit in chunks of size 10
chunk(10.minutes) - emit every 10 minutes. Chunks will have at least one element or will not be emitted(?). Edit: or perhaps just emit 0-sized chunks. Let the user filter them out in separate operator.
chunk(10, 10.minutes) - emit when size 10 is reached OR 10 minutes have passed

...come to think of it, maybe we could somehow insert AND or OR between size and duration parameter for some more customization:
chunk(size = 10 AND timePassed = 10.minutes)
chunk(size = 10 OR timePassed = 10.minutes)

That would solve all minSize issues without additional parameter.

@circusmagnus
Copy link

Discussion seems to have stalled. Perhaps we should take the issue to the coroutines slack channel? Or do something else to attract more public, gather use-cases and discuss design?

@NikkyAI
Copy link

NikkyAI commented Dec 16, 2020

my usecase is also batching (not sure if natural or not)

specifically emit if either the buffer reaches size limit or time limit is reached and the buffer contains any elements, although i could also easily filter out empty lists in the flow later

crucial is that after emitting it resets the timer, without that happening it might be that the next run emits too early because the previous timer triggers

also backpressure and such being maintained ofc

@circusmagnus
Copy link

circusmagnus commented Dec 23, 2020

Ok, I will have another go at this issue. @elizarov mentioned, that there is no obvious case for minSize and it complicates things. So, let`s assume, that minimum chunk size is always one. Seems to fit all use cases:

  • Size based chunking: no need to specify both min and max size. Single parameter is enough
  • Time based chunking: No use cases for zero-sized chunks. While I would be happy to bump up minSize of my analytics chunks, I can do it pretty easily with other operator further downstream
  • Natural batching: Min size of 1 is pretty obvious here.

No minSize param then.

That leaves us with duration and size params:
fun Flow<T>.chunked(interval: Duration, size: Int): Flow<List<T>>

How to express our use cases with it..

  • Natural batching:
chunked(interval = 0 // emit as soon as possible, size = someLargishValue // how big of a buffer you are willing to store)
  • Time based chunking
chunked(interval = x // your main consideration, size = maxAcceptableBuffer)
  • Size based chunking: That one gets trickier, as we do not need duration at all:
chunked(interval = NO_INTERVAL // technically an Int.maxValue or such, size = desiredBufferSize)

Since our 'size' parameter is either maximumSize for time-based chunking or just desired size - it means we should try to emit immediately, when it is reached. Suspending upstream, if need be.

Interval param is a little different. We cannot guarantee, that chunk will be emitted exactly after interval has passed. Since interval does not relate to size, we can, I think, safely assume, that it is ok to buffer subsequent elements after interval has passed, even if we cannot emit, due to busy downstream.

In other words reaching size limit - we do suspend upstream until emission happens. Chunk cannot grow bigger, than specified. Reaching time limit - we do not suspend upstream no matter whether we did emit or are still waiting for downstream to get ready. Our time limit may be breached due to busy downstream. We cannot prevent it.

That shapes our design into `chunked(intervalConstraint OR sizeConstraint) consistently across all use cases.

So the proposal boils down to
fun Flow<T>.chunked(interval: Duration, size: Int): Flow<List<T>>

Proposed impl (give or take - no sanity checks, etc):

public fun <T> Flow<T>.chunked(interval: Duration, size: Int): Flow<List<T>> = scopedFlow { downstream ->
    val buffer = Channel<T>(size)
    val emitSemaphore = Channel<Unit>()
    val collectSemaphore = Channel<Unit>()

    launch {
        collect { value ->
            val hasCapacity = buffer.offer(value)
            if (!hasCapacity) {
                emitSemaphore.send(Unit)
                collectSemaphore.receive()
                buffer.send(value)
            }
        }
        emitSemaphore.close()
        buffer.close()
    }

    whileSelect {

        emitSemaphore.onReceiveOrClosed { valueOrClosed ->
            buffer.drain().takeIf { it.isNotEmpty() }?.let { downstream.emit(it) }
            val shouldCollectNextChunk = valueOrClosed.isClosed.not()
            if (shouldCollectNextChunk) collectSemaphore.send(Unit)
            else collectSemaphore.close()
            shouldCollectNextChunk
        }

        onTimeout(interval) {
            downstream.emit(buffer.awaitFirstAndDrain())
            true
        }
    }
}

Helper functions:

private suspend fun <T> ReceiveChannel<T>.awaitFirstAndDrain(): List<T> {
    val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList()
    return drain(mutableListOf(first))
}

private tailrec fun <T> ReceiveChannel<T>.drain(acc: MutableList<T> = mutableListOf()): List<T> {
    val item = poll()
    return if (item == null) acc
    else {
        acc.add(item)
        drain(acc)
    }
}

Plus optimized, non-concurrent, impl for purely size-based chunking:

private fun <T> Flow<T>.chunkedSizeBased(maxSize: Int): Flow<List<T>> = flow {
    val buffer = mutableListOf<T>()
    collect { value ->
        buffer.add(value)
        if (buffer.size == maxSize) emit(buffer.drain())
    }
    if (buffer.isNotEmpty()) emit(buffer)
}

@fluidsonic
Copy link

I'm using my own rough implementation for this to batch database updates.

  • The following approach creates batch of at most 100k updates and doesn't wait longer than 2 seconds per batch.
  • The size limit prevents queries from becoming too large. I'm using MongoDB's bulk write feature and the server can have limits on how many updates are allowed per bulk write.
  • The time limit ensures that data in the database is reasonably up-to-date and that the amount of data lost when a server crashes is reasonably low. A lower time limit also helps spreading database load more evenly because big updates can take quite some time and resources.
upstreamFlowOfManyFrequentUpdates
   .chunked(sizeLimit = 100_000, timeLimit = 2.seconds)
   .collect { updates ->
      // write updates to DB
   }

@circusmagnus
Copy link

I like the signature
chunked(sizeLimit: Int, timeLimit: Duration)
Makes it quite clear, that they are in relation sizeLimit OR timeLimit. Either of them could also have a value NO_LIMIT (= Int.MAX_VALUE). timeLimit could also have a form of NATURAL_BATCHING (=0).

@circusmagnus
Copy link

circusmagnus commented Jan 3, 2021

private fun <T> Flow<T>.chunkedSizeBased(maxSize: Int): Flow<List<T>> = flow {
    val buffer = mutableListOf<T>()
    collect { value ->
        buffer.add(value)
        if (buffer.size == maxSize) emit(buffer.drain())
    }
    if (buffer.isNotEmpty()) emit(buffer)
}

Come to think of it, this is not a good idea. Chunking should always happen in different coroutine, than downstream collecting. I think, having your upstream blocked due to slow consumer (like sending a chunk data over a network) is not desirable even in purely size-based chunking. It is also more consistent this way.

@NikkyAI
Copy link

NikkyAI commented Jan 3, 2021

backpressure is a desirable property in some systems, at least for our usecase

@circusmagnus
Copy link

I agree. We will have backpressure working all right in both cases. Main difference is that backpressure in concurrent solution will kick in a bit later.

  • In concurrent solution (which we cannot avoid in time-based chunking), a new chunk may be accumulated at the same time, as previous chunk is being processed (written to db, send over network, etc). However, when new chunk hits size limit, upstream gets suspended untill this new chunk gets accepted by downstream.
  • In sequential solution (possible only for size-based chunking) we would simply not start accumulating new chunk, until previous one is fully processed.

For one, I would not like to have my analytics events "production" suspended for a time of making a network request with previous chunk. I want my analytics events "production" suspended only, if chunk hits size limit.

For two, I think time- and size-based chunking should behave consistently / in same way, when it comes to backpressure due to size-limit constraint

@AWinterman
Copy link

I just want to note that there's an impl here that solves at least 50% of the use case: #2193 (comment)

@adherencegoo
Copy link

You can use built-in functions to achieve the time-based chunk

private class TimedChunkFlow<T>(sourceFlow: Flow<T>, periodMs: Long) {
    private val chunkLock = ReentrantLock()
    private var chunk = mutableListOf<T>()

    val resultFlow = flow {
        sourceFlow.collect {
            // the chunk is reused before it's collected by "sample()"
            val localChunk = chunkLock.withLock {
                chunk.add(it)
                chunk
            }
            emit(localChunk)
        }
    }.sample(periodMs).onEach {
        chunkLock.withLock {
            chunk = mutableListOf()
        }
    }
}

fun <T> Flow<T>.timedChunk(periodMs: Long): Flow<List<T>> = TimedChunkFlow(this, periodMs).resultFlow

@ezeyniyev
Copy link

Hi
I did something different than transforming flow into a chunked flow, but I think it could be helpful to this issue.

My implementation transforms ReceiveChannel to chunked flow that emits chunks limited by size or availability in time period https://github.com/ezeyniyev/kotlin-chunked-flow.git
Extension method fun <T> ReceiveChannel<T>.asChunkedFlow(chunkSize: Int, timeout: Long) : Flow<List<T>> has

  • chunkSize max items in chunk to emit
  • timeout max milliseconds to wait to fill chunk up to chunkSize items and emit that (non empty chunks)

for example

		channel.asChunkedFlow(4, 130).collect { items ->
			println("2: processing $items")
			delay(100)
		}

makes a receiver flow from that channel that emits

  • chunks of 4 elements immediately if available
  • in other case when some element will be available waits 130 millis to get (up to 4) elements and emits all that was received

You can connect many workers that will consume chunks from that channel.
for example

	launch {
		channel.asChunkedFlow(4, 130).collect { items ->
			println("1: processing $items")
			delay(100)
		}
	}
	launch {
		channel.asChunkedFlow(4, 130).collect { items ->
			println("2: processing $items")
			delay(100)
		}
	}
	launch {
		channel.asChunkedFlow(4, 130).collect { items ->
			println("3: processing $items")
			delay(100)
		}
	}

Of course multiple producers can send to that channel

@circusmagnus
Copy link

Based on discussion here and some of my own exepriences, I have updated PR #2378

Current APi proposal and usage:
myFlow.chunked(ChunkingMethod.ByTime(interval = TimeUnit.MINUTES.toMillis(15), maxSize = 1024)
myFlow.chunked(ChunkingMethod.BySize(size = 5))
myFlow.chunked(ChunkingMethod.Natural())
myFlow.chunked(ChunkingMethod.ByTimeOrSize(interval =TimeUnit.MINUTES.toMillis(15), maxSize = 20))

/**
 * Groups emissions from this Flow into lists, according to the chosen ChunkingMethod. Time based implementations
 * collect upstream and emit to downstream in separate coroutines - concurrently, like Flow.buffer() operator.
 * Exact timing of emissions is not guaranteed, as it depends on collector coroutine availability.
 *
 * Size based chunking happens in a single coroutine and is purely sequential.
 *
 * Emissions always preserve order.
 *
 * It is possible to pass custom implementation of ChunkingMethod to chunked() operator.
 *
 * @param method Defines constrains on chunk size and time of its emission.
 */

@ExperimentalCoroutinesApi
public fun <T> Flow<T>.chunked(method: ChunkingMethod): Flow<List<T>> = with(method) { chunk() }

@ExperimentalCoroutinesApi
public interface ChunkingMethod {
    public fun <T> Flow<T>.chunk(): Flow<List<T>>

    public companion object {

        /**
         * Collects upstream and emits to downstream in separate coroutines - as soon as possible. If consumer keeps
         * up with the producer, it emits lists with single element.
         *
         * In case of slow consumer, it groups emissions into bigger lists. When consumer "speeds up", chunks
         * will get smaller.
         *
         * @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes"
         * a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory.
         */
        @Suppress("FunctionName")
        public fun Natural(maxSize: Int = Int.MAX_VALUE): ChunkingMethod = NaturalChunking(maxSize)

        /**
         * Collects upstream into a buffer and emits its content as a list at every interval. When upstream completes
         * (or is empty), it will try to emit immediately what is left of a chunk, omitting the interval.
         *
         * @param intervalMs Interval between emissions in milliseconds. Every emission happens only after
         * interval passes, unless upstream Flow completes sooner.
         *
         * @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes"
         * a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory.
         */
        @Suppress("FunctionName")
        public fun ByTime(intervalMs: Long, maxSize: Int = Int.MAX_VALUE): ChunkingMethod =
            TimeBased(intervalMs, maxSize)

        /**
         * Collects upstream into a buffer and emits its content as a list at every interval or when its buffer reaches
         * maximum size. When upstream completes (or is empty), it will try to emit immediately what is left of
         * a chunk, omitting the interval and maxSize constraints.
         *
         * @param intervalMs Interval between emissions in milliseconds. Every emission happens only after
         * interval passes, unless upstream Flow completes sooner or maximum size of a chunk is reached.
         *
         * @param maxSize Maximum size of a single chunk. If reached, it will try to emit a chunk, ignoring the
         * interval constraint. If so happens, time-to-next-chunk gets reset to the interval value.
         */
        @Suppress("FunctionName")
        public fun ByTimeOrSize(intervalMs: Long, maxSize: Int): ChunkingMethod = TimeOrSizeBased(intervalMs, maxSize)

        /**
         * Collects upstream into a buffer and emits its content as a list, when specified size is reached.
         * This implementation is purely sequential. If concurrent upstream collection and downstream emissions are
         * desired, one can use a buffer() operator after chunking
         *
         * @param size Exact size of emitted chunks. Only the last emission may be smaller.
         */
        @Suppress("FunctionName")
        public fun BySize(size: Int): ChunkingMethod = SizeBased(size)
    }
}

Implementation details can be seen fully in the PR: #2378
Basically size-based flow is purely sequential, simple impl, based on ArrayList as an accumulator.

Time based methods use a Channel as an accumulator and some form of signaller (a Job or a Channel in case of TimeOrSize method), to eventually indicate, that it should be emptied before interval passes.
Emptying of an accumulator Channel is done via tryReceive in a loop - until accumulator channel gets empty / closed.

Natural Chunking uses just two concurrent coroutines - one sends to the channel, the other uses tryReceive in a loop (suspending while waiting for first element).

@circusmagnus
Copy link

@qwwdfsad @elizarov What do you think?

@joffrey-bion
Copy link
Contributor

joffrey-bion commented May 19, 2021

@circusmagnus thanks a lot for this proposal and PR. I just have a couple considerations regarding the public API.

  1. Is it necessary to provide ChunkingMethod as an interface here? Nothing at all is reused in chunked(ChunkingMethod) for a different implementation. Using a custom implementation of ChunkingMethod is completely equivalent to just creating a custom extension Flow<T>.something(): Flow<List<T>>.
    The only use case I can think of for using an actual interface is if we want to write a library that uses batches internally (using chunked(ChunkingMethod)), and still allow users of the library to provide a custom chunking method. But in this case, the lib could just take a function Flow<T>.() -> Flow<List<T>> as configuration.

  2. I believe simple chunked() usages will look quite heavy in usual code with the ChunkingMethod parameter. I liked the initial design with 2 simple parameters. I understand that we have a problem with ByTime VS ByTimeOrSize taking the same parameters but having different semantics for the maxSize parameter (in ByTime we only use maxSize for backpressure, while in ByTimeOrSize we also use it for emissions).

Maybe a simple solution to 2) would be to additionally define related high-level functions like chunkedBySize(), chunkedByTime(), etc. but that may be considered "operator explosion".

An alternative could be to simply define chunked() as taking 3 params, used to internally select a chunking method:

  • timeLimit - emits the whole current buffer as soon as this is reached (timer is reset after each actual emission). If the consumer is not ready, keep filling the buffer.
  • sizeLimit - maximum size of the chunks and also the size of the internal buffer. When this is reached, what happens depends on onFullBuffer.
  • onFullBuffer - if EMIT_OR_SUSPEND, emits the whole buffer when sizeLimit is reached or suspends the producer if the consumer is not ready when this happens. if SUSPEND, reaching sizeLimit only applies backpressure to the producer but never triggers emissions.

@circusmagnus
Copy link

That is true, that ChunkingMethod interface does not provide any useful functionality. The problem it was meant to solve, is to fit all four use cases of chunking into a single operator. And (I hope) help the end user to discover what are the chunking options available. It was inspired by SharingStarted - an interface describing sharing strategy for SharedFlow.

I like very much your idea of onFullBuffer param and going back to more lightweight params in general.
I have two dillemas with them:

  • No easy way to fit purely size based chunking into a three-param chunked(interval, maxSize, onFullBuffer). It would need separate operator, no way around it IMHO. Otherwise you would have to write chunked(interval = Int.MAX_VALUE, maxSize = 10, onFullBuffer = EMIT). Even more work, than chunked(BySize(10))`.
  • It would be a little hard to discover, that chunked(interval = 0) is possible and results in a Natural Chunking use case.

But I would be equally happy with merging a PR with such an (simpler) API.

@joffrey-bion
Copy link
Contributor

joffrey-bion commented May 21, 2021

You're right. I guess as soon as we switch to simple params, the dilemma will be around default values.

The defaults I was initially thinking about were:

  • timeLimit = Duration.INFINITY (alternative names: interval, period)
  • sizeLimit = Int.MAX_VALUE (alternative name: maxSize)
  • onFullBuffer = EMIT_OR_SUSPEND

So the use cases would look this way on the call site:

  • chunked(sizeLimit = 10) = pure size-based chunking with backpressure
  • chunked(sizeLimit = 10, timeLimit = Duration.seconds(2)) = size or time, with size-based backpressure
  • chunked(timeLimit = Duration.seconds(2)) = pure time-based chunking (but no backpressure, dangerous)
  • chunked(timeLimit = 0) = unbounded natural batching (no backpressure)
  • chunked(sizeLimit = 10, timeLimit = 0) = bounded natural batching (with backpressure)
  • chunked(sizeLimit = 10, timeLimit = Duration.seconds(2), onFullBuffer = SUSPEND) = pure time-based chunking with backpressure, feels pretty complex to express

I'm not a fan of the unbounded default for time-based chunking though... :/ Other defaults could be considered, but there may never be a sweet spot unfortunately.

All-in-all, I sort of agree that the interface (or potential sealed class) would make it more discoverable (I would never know about natural batching without this discussion by just seeing this API).
So maybe your current PR + some convenient overloads of chunked() would be the best compromise.

I'd like to hear the Kotlin team's opinion about this.

@circusmagnus
Copy link

circusmagnus commented May 22, 2021

As I said, I would go with two/three operators, when without Chunking method.
Flow<T>.chunked(size: Int): Flow<List<T>> - simple size-based
Flow<T>.chunked(interval: Long/Duration, maxChunkSize: Int = Int.MAX_VALUE, onFullBuffer: OnFullBuffer = EMIT) - for all the other use cases

That way chunked(size = 10) and chunked(interval = 10.seconds) should cover elegantly the basic needs with more optional customization for time-based version. chunked(interval = 0.seconds) is a bit cryptic but good enough too.

I am sticking to original param names, as timeLimit sounds unobvious for me. "Chunked by timeLimits?. TimeLimit = 0? (for Natural Batching)" But maybe that`s my poor english knowledge. :)

@circusmagnus
Copy link

Half a year later... What about such a signature?

fun <T> Flow<T>.chunk(
    maxSize: Int,
    naturalBatching: Boolean = false,
    delayUntilNext: (suspend (previousChunk: List<T>?) -> Unit)? = null
): Flow<List<T>>

example usage:
flow.chunk(10) - simple size chunking
flow.chunk(512) { delay(5.minutes) } - accumulate values for 5 minutes with max size of 512
flow.chunk(512, naturalBuffering = true) - natural buffering variant with max size
flow.chunk(512) { previousChunk -> if(previousChunk.size == 512) Unit else delay(5.seconds) } - speed up if buffer is getting full before chunk emission
flow.chunk(512) { semaphoreChannel.receive() } - emit chunk after signal from some external source

Pros:

  • one function covering wide array of chunking possibilites
  • chunk interval can be dynamically adjusted by the user in various ways
  • Not so much ceremony in usage

Cons:

  • no way of deciding, whether to emit or suspend onBufferOverlow. But is it really needed? Perhaps emitting when full is a good deafult and it is not worth it, to bother people with thinking too much about it? If downstream cannot keep up, then upstream will get suspended anyway. If downstream can process chunks faster, to offload a full buffer, then why not? Or maybe suspending, when full would be more intuitive behaviour?
  • Still not so simple, as
    flow.chunk(maxSize = 512, interval = 5.minutes) .

@gildor
Copy link
Contributor

gildor commented Feb 18, 2022

Still not so simple, as flow.chunk(maxSize = 512, interval = 5.minutes)

I don't think it's a problem, it probably worth to add an overload that under the hood uses proposed function, there are already such simplified APIs on kotlinx.coroutines which use more generic versions of API to provide more case-specific version of a function

@cloudshiftchris
Copy link

Adding a variant on the "size" use case - where the downstream consumer is an API call it may have constraints on the set of data, such as:

  • total number of elements <= x
  • total weight of batch (length of each element) <= y

Both involve maintaining and checking state to determine if chunk should be emitted.

It would be helpful if chunked could use a "batch full" predicate, of which "size" is one possible value (and likely the most frequently used one); others are "weight" and "size or weight".

From existing Java implementations of this (closed source), "weight" can default to Long.MAX_VALUE and the weighing function can default to return 0. This assumes an implementation that tracks both, which may not be the case for a Flow-based variant.

@Kotlin Kotlin deleted a comment from hugo891 Oct 11, 2022
@Kotlin Kotlin deleted a comment from hugo891 Oct 11, 2022
@rasros
Copy link

rasros commented Feb 3, 2023

I'm using this implementation which uses standard functions. I'm curious to know if there's anything to improve.

It is a bit wasteful by using launch for every added value but the implementation is clean IMO since there's no polling.

fun <T> Flow<T>.chunked(maxSize: Int, interval: Duration) = channelFlow {

    val buffer = mutableListOf<T>()
    var flushJob: Job? = null

    collect { value ->
        
        flushJob?.cancelAndJoin()
        buffer.add(value)
        
        if (buffer.size >= maxSize) {
            send(buffer.toList())
            buffer.clear()
        } else {
            flushJob = launch {
                delay(interval)
                if (buffer.isNotEmpty()) {
                    send(buffer.toList())
                    buffer.clear()
                }
            }
        }
    }

    flushJob?.cancelAndJoin()
    
    if (buffer.isNotEmpty()) {
        send(buffer.toList())
        buffer.clear()
    }
}

EDIT: updated from circusmagnus comments.

@circusmagnus
Copy link

  • I fear you may have a little shared mutable state problem with this impl. However, if you replace flushJob?.cancel() with flushJob?.cancelAndJoin() it should be solved.
  • I am not sure, if this impl. will be performant enough with big number of event emissions. But maybe it is not a problem in your case.

Otherwise it indeed looks clean and simple. :)

@dovchinnikov
Copy link
Contributor

We also need this in IJ

@yangwuan55
Copy link

yangwuan55 commented Feb 24, 2023


fun <T> Flow<T>.bufferList(size:Int):Flow<List<T>> {
        return flow {
            val buffer = mutableListOf<T>()
            this@bufferList.collect {
                buffer.add(it)
                if (buffer.size == size) {
                    this.emit(buffer)
                    buffer.clear()
                }
            }
        }
    }

Can I imp for this code?

@Richie94
Copy link

Richie94 commented Mar 30, 2023


fun <T> Flow<T>.bufferList(size:Int):Flow<List<T>> {
        return flow {
            val buffer = mutableListOf<T>()
            this@bufferList.collect {
                buffer.add(it)
                if (buffer.size == size) {
                    this.emit(buffer)
                    buffer.clear()
                }
            }
        }
    }

Can I imp for this code?

I guess that wont work, because it wont emit if the flow terminates but the buffer is not exactly full

@chrisfillmore
Copy link

It would be helpful if chunked could use a "batch full" predicate, of which "size" is one possible value (and likely the most frequently used one); others are "weight" and "size or weight".

I just want to echo this. I'm currently using a chunked implementation to post logs to a service. I would like to meet these two requirements:

  • Logs should not be too frequent
  • Logs should not be too large

However, timeliness is not important in my use case. If emissions are buffered, or suspend, to satisfy the above requirements, that is fine.

@rocketraman
Copy link
Contributor

Would love this type of operator in stdlib. My use case is chunking bluetooth advertisements on Android so that we can determine when a device hasn't been seen before, and when a device has been turned off or is out of range.

See JuulLabs/kable#521.

@SPC-code
Copy link

Copied from #4069

Use case

I have several cases where I need to group flow events by time and then combine them into something. The simplest example is averaging. If I have the real-time data, I frequently want to take all points in a given time window and return only averaged value for this time frame (or default value if no events happened in this time window). I have several such cases in VisionForge (visualization library) and Controls-kt (device data acquisition.

The Shape of the API

I would call it sample, but it is already taken, so it could be something like chunkedByTime to be aligned with stdlib method for collections. So, it could look like this: fun Flow.chunkedByTime(duration: Duration): Flow<List.

To make the API more universal one could provide external signal trigger. So, the collection is triggered by obtaining signal from a flow or channel.

Another API consideration is usage of API on numbers. For my purposes List is OK, it could be grouped later, which gives flexibility. But when we use List of numbers, it impacts performance. So, one could provide an inline method that transforms array of numbers internally in the flow evaluation loop. Like fun Flow.chunkedDoubleByTime(duration: Duration, collector: (DoubleArray)->R): Flow. Such API is opinionated, so I am not sure it should be in the library.

Prior Art

sample and debounce has similar functionality, but they return only one value, not all of them. I think the implementation could share some code.

@frankcoutinho
Copy link

Hey guys from 2024!
Anyone has a production working snippet for this?
I'm not smart enough to easily identify the corner cases when implementing this. So far I've seen:

Option A)

@ExperimentalCoroutinesApi  
@ObsoleteCoroutinesApi  
fun <T> Flow<T>.bufferTimeout(size: Int, duration: Duration): Flow<T> {  
    require(size > 0) { "Window size should be greater than 0" }  
    require(duration.toMillis() > 0) { "Duration should be greater than 0" }

    return flow {  
        coroutineScope {  
            val events = ArrayList<T>(size)
            val tickerChannel = ticker(duration.toMillis())
            try {
                var hasTimedOut = false  
                val upstreamValues = produce { collect { send(it) } }  

                while (isActive) {  
                    val tickerJob = launch {  
                        tickerChannel.receive()
                        hasTimedOut = true  
                    }  

                    withTimeoutOrNull(10) { upstreamValues.receive() }  
                        ?.let { events.add(it) }

                    if (events.size == size || (hasTimedOut && events.isNotEmpty())) {  
                        emit(events.toList())  
                        events.clear()
                        tickerJob.cancel()
                        hasTimedOut = false
                    }
                }
            } finally {
                tickerChannel.cancel()
            }
        }
    }
}

Option B

@OptIn(FlowPreview::class)
private class TimedChunkFlow<T>(sourceFlow: Flow<T>, periodMs: Long) {
    private val chunkLock = ReentrantLock()
    private var chunk = mutableListOf<T>()

    val resultFlow = flow {
        sourceFlow.collect {
            // the chunk is reused before it's collected by "sample()"
            val localChunk = chunkLock.withLock {
                chunk.add(it)
                chunk
            }
            emit(localChunk)
        }
    }.sample(periodMs).onEach {
        chunkLock.withLock {
            chunk = mutableListOf()
        }
    }
}

Option C

fun <T> Flow<T>.chunked(maxSize: Int, interval: Duration) = channelFlow {

    val buffer = mutableListOf<T>()
    var flushJob: Job? = null

    collect { value ->
        
        flushJob?.cancelAndJoin()
        buffer.add(value)
        
        if (buffer.size >= maxSize) {
            send(buffer.toList())
            buffer.clear()
        } else {
            flushJob = launch {
                delay(interval)
                if (buffer.isNotEmpty()) {
                    send(buffer.toList())
                    buffer.clear()
                }
            }
        }
    }

    flushJob?.cancelAndJoin()
    
    if (buffer.isNotEmpty()) {
        send(buffer.toList())
        buffer.clear()
    }
}

I also checked the proposal https://github.com/Kotlin/kotlinx.coroutines/pull/2378/files
But since it used internal functions, I'm unable to use

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests