diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index 6ab89cbc8eb..d216d146b3a 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -213,6 +213,46 @@ public final class arrow/fx/coroutines/Race3Kt { public static synthetic fun raceN$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } +public final class arrow/fx/coroutines/RacingKt { + public static final fun racing (Larrow/core/raise/Raise;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun racing (Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun racing$default (Larrow/core/raise/Raise;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun racing$default (Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; +} + +public abstract interface class arrow/fx/coroutines/RacingScope { + public abstract fun race (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun raceOrFail (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public final class arrow/fx/coroutines/RacingScope$DefaultImpls { + public static synthetic fun race$default (Larrow/fx/coroutines/RacingScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun raceOrFail$default (Larrow/fx/coroutines/RacingScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; +} + +public abstract interface class arrow/fx/coroutines/RaiseRacingScope : arrow/core/raise/Raise { + public abstract fun race (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun raceOrFail (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun raceOrRaise (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun raceOrThrow (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public final class arrow/fx/coroutines/RaiseRacingScope$DefaultImpls { + public static fun bind (Larrow/fx/coroutines/RaiseRacingScope;Larrow/core/Either;)Ljava/lang/Object; + public static fun bind (Larrow/fx/coroutines/RaiseRacingScope;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; + public static fun bind (Larrow/fx/coroutines/RaiseRacingScope;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun bindAll (Larrow/fx/coroutines/RaiseRacingScope;Ljava/lang/Iterable;)Ljava/util/List; + public static fun bindAll (Larrow/fx/coroutines/RaiseRacingScope;Ljava/util/Map;)Ljava/util/Map; + public static fun bindAll-1TN0_VU (Larrow/fx/coroutines/RaiseRacingScope;Ljava/util/Set;)Ljava/util/Set; + public static fun bindAll-vcjLgH4 (Larrow/fx/coroutines/RaiseRacingScope;Ljava/util/List;)Ljava/util/List; + public static fun invoke (Larrow/fx/coroutines/RaiseRacingScope;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; + public static fun invoke (Larrow/fx/coroutines/RaiseRacingScope;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun race$default (Larrow/fx/coroutines/RaiseRacingScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun raceOrFail$default (Larrow/fx/coroutines/RaiseRacingScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun raceOrRaise$default (Larrow/fx/coroutines/RaiseRacingScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun raceOrThrow$default (Larrow/fx/coroutines/RaiseRacingScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; +} + public abstract interface annotation class arrow/fx/coroutines/ResourceDSL : java/lang/annotation/Annotation { } diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api index e77804743fe..03ba83f9298 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api @@ -18,6 +18,18 @@ open annotation class arrow.fx.coroutines/ScopeDSL : kotlin/Annotation { // arro constructor () // arrow.fx.coroutines/ScopeDSL.|(){}[0] } +abstract interface <#A: kotlin/Any?, #B: kotlin/Any?> arrow.fx.coroutines/RaiseRacingScope : arrow.core.raise/Raise<#A> { // arrow.fx.coroutines/RaiseRacingScope|null[0] + abstract suspend fun race(kotlin.coroutines/CoroutineContext = ..., kotlin.coroutines/SuspendFunction1, #B>) // arrow.fx.coroutines/RaiseRacingScope.race|race(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1,1:1>){}[0] + abstract suspend fun raceOrFail(kotlin.coroutines/CoroutineContext = ..., kotlin.coroutines/SuspendFunction1, #B>) // arrow.fx.coroutines/RaiseRacingScope.raceOrFail|raceOrFail(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1,1:1>){}[0] + abstract suspend fun raceOrRaise(kotlin.coroutines/CoroutineContext = ..., kotlin.coroutines/SuspendFunction1, #B>) // arrow.fx.coroutines/RaiseRacingScope.raceOrRaise|raceOrRaise(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1,1:1>){}[0] + abstract suspend fun raceOrThrow(kotlin.coroutines/CoroutineContext = ..., kotlin.coroutines/SuspendFunction1, #B>) // arrow.fx.coroutines/RaiseRacingScope.raceOrThrow|raceOrThrow(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1,1:1>){}[0] +} + +abstract interface <#A: kotlin/Any?> arrow.fx.coroutines/RacingScope { // arrow.fx.coroutines/RacingScope|null[0] + abstract suspend fun race(kotlin.coroutines/CoroutineContext = ..., kotlin.coroutines/SuspendFunction1) // arrow.fx.coroutines/RacingScope.race|race(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1){}[0] + abstract suspend fun raceOrFail(kotlin.coroutines/CoroutineContext = ..., kotlin.coroutines/SuspendFunction1) // arrow.fx.coroutines/RacingScope.raceOrFail|raceOrFail(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1){}[0] +} + abstract interface arrow.fx.coroutines/ResourceScope : arrow/AutoCloseScope { // arrow.fx.coroutines/ResourceScope|null[0] abstract suspend fun <#A1: kotlin/Any?> (kotlin.coroutines/SuspendFunction1).bind(): #A1 // arrow.fx.coroutines/ResourceScope.bind|bind@kotlin.coroutines.SuspendFunction1(){0§}[0] abstract suspend fun <#A1: kotlin/Any?> install(kotlin.coroutines/SuspendFunction1, kotlin.coroutines/SuspendFunction2<#A1, arrow.fx.coroutines/ExitCase, kotlin/Unit>): #A1 // arrow.fx.coroutines/ResourceScope.install|install(kotlin.coroutines.SuspendFunction1;kotlin.coroutines.SuspendFunction2<0:0,arrow.fx.coroutines.ExitCase,kotlin.Unit>){0§}[0] @@ -162,6 +174,7 @@ final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?> (kotlin.co final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?> (kotlin.collections/Iterable<#B>).arrow.fx.coroutines/parMapOrAccumulate(kotlin.coroutines/CoroutineContext = ..., kotlin/Function2<#A, #A, #A>, kotlin.coroutines/SuspendFunction2, #B, #C>): arrow.core/Either<#A, kotlin.collections/List<#C>> // arrow.fx.coroutines/parMapOrAccumulate|parMapOrAccumulate@kotlin.collections.Iterable<0:1>(kotlin.coroutines.CoroutineContext;kotlin.Function2<0:0,0:0,0:0>;kotlin.coroutines.SuspendFunction2,0:1,0:2>){0§;1§;2§}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?> (kotlin.collections/Iterable<#B>).arrow.fx.coroutines/parMapOrAccumulate(kotlin.coroutines/CoroutineContext = ..., kotlin/Int, kotlin.coroutines/SuspendFunction2, #B, #C>): arrow.core/Either, kotlin.collections/List<#C>> // arrow.fx.coroutines/parMapOrAccumulate|parMapOrAccumulate@kotlin.collections.Iterable<0:1>(kotlin.coroutines.CoroutineContext;kotlin.Int;kotlin.coroutines.SuspendFunction2,0:1,0:2>){0§;1§;2§}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?> (kotlin.collections/Iterable<#B>).arrow.fx.coroutines/parMapOrAccumulate(kotlin.coroutines/CoroutineContext = ..., kotlin/Int, kotlin/Function2<#A, #A, #A>, kotlin.coroutines/SuspendFunction2, #B, #C>): arrow.core/Either<#A, kotlin.collections/List<#C>> // arrow.fx.coroutines/parMapOrAccumulate|parMapOrAccumulate@kotlin.collections.Iterable<0:1>(kotlin.coroutines.CoroutineContext;kotlin.Int;kotlin.Function2<0:0,0:0,0:0>;kotlin.coroutines.SuspendFunction2,0:1,0:2>){0§;1§;2§}[0] +final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (arrow.core.raise/Raise<#A>).arrow.fx.coroutines/racing(kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>, kotlin.coroutines/SuspendFunction2? = ..., kotlin/Function1, kotlin/Unit>): #B // arrow.fx.coroutines/racing|racing@arrow.core.raise.Raise<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>;kotlin.coroutines.SuspendFunction2?;kotlin.Function1,kotlin.Unit>){0§;1§}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.collections/Iterable<#A>).arrow.fx.coroutines/parMap(kotlin.coroutines/CoroutineContext = ..., kotlin.coroutines/SuspendFunction2): kotlin.collections/List<#B> // arrow.fx.coroutines/parMap|parMap@kotlin.collections.Iterable<0:0>(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction2){0§;1§}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.collections/Iterable<#A>).arrow.fx.coroutines/parMap(kotlin.coroutines/CoroutineContext = ..., kotlin/Int, kotlin.coroutines/SuspendFunction2): kotlin.collections/List<#B> // arrow.fx.coroutines/parMap|parMap@kotlin.collections.Iterable<0:0>(kotlin.coroutines.CoroutineContext;kotlin.Int;kotlin.coroutines.SuspendFunction2){0§;1§}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.collections/Iterable<#A>).arrow.fx.coroutines/parMapNotNull(kotlin.coroutines/CoroutineContext = ..., kotlin.coroutines/SuspendFunction2): kotlin.collections/List<#B> // arrow.fx.coroutines/parMapNotNull|parMapNotNull@kotlin.collections.Iterable<0:0>(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction2){0§;1§}[0] @@ -170,6 +183,7 @@ final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.coroutines/SuspendF final suspend fun <#A: kotlin/Any?> (kotlin.coroutines/SuspendFunction1).arrow.fx.coroutines/allocated(): kotlin/Pair<#A, kotlin.coroutines/SuspendFunction1> // arrow.fx.coroutines/allocated|allocated@kotlin.coroutines.SuspendFunction1(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines/CoroutineScope).arrow.fx.coroutines.await/awaitAll(kotlin.coroutines/SuspendFunction1): #A // arrow.fx.coroutines.await/awaitAll|awaitAll@kotlinx.coroutines.CoroutineScope(kotlin.coroutines.SuspendFunction1){0§}[0] final suspend fun <#A: kotlin/Any?> arrow.fx.coroutines.await/awaitAll(kotlin.coroutines/SuspendFunction1): #A // arrow.fx.coroutines.await/awaitAll|awaitAll(kotlin.coroutines.SuspendFunction1){0§}[0] +final suspend fun <#A: kotlin/Any?> arrow.fx.coroutines/racing(kotlin.coroutines/SuspendFunction2? = ..., kotlin/Function1, kotlin/Unit>): #A // arrow.fx.coroutines/racing|racing(kotlin.coroutines.SuspendFunction2?;kotlin.Function1,kotlin.Unit>){0§}[0] final suspend fun <#A: kotlin/Any?> arrow.fx.coroutines/resourceScope(kotlin.coroutines/SuspendFunction1): #A // arrow.fx.coroutines/resourceScope|resourceScope(kotlin.coroutines.SuspendFunction1){0§}[0] final suspend fun <#A: kotlin/AutoCloseable> (arrow.fx.coroutines/ResourceScope).arrow.fx.coroutines/autoCloseable(kotlinx.coroutines/CoroutineDispatcher = ..., kotlin.coroutines/SuspendFunction0<#A>): #A // arrow.fx.coroutines/autoCloseable|autoCloseable@arrow.fx.coroutines.ResourceScope(kotlinx.coroutines.CoroutineDispatcher;kotlin.coroutines.SuspendFunction0<0:0>){0§}[0] final suspend fun arrow.fx.coroutines/cancelAndCompose(kotlinx.coroutines/Deferred<*>, kotlinx.coroutines/Deferred<*>) // arrow.fx.coroutines/cancelAndCompose|cancelAndCompose(kotlinx.coroutines.Deferred<*>;kotlinx.coroutines.Deferred<*>){}[0] diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Racing.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Racing.kt new file mode 100644 index 00000000000..ce1c2ccf632 --- /dev/null +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Racing.kt @@ -0,0 +1,163 @@ +package arrow.fx.coroutines + +import arrow.atomic.Atomic +import arrow.atomic.update +import arrow.atomic.value +import arrow.core.identity +import arrow.core.nonFatalOrThrow +import arrow.core.prependTo +import arrow.core.raise.DelicateRaiseApi +import arrow.core.raise.Raise +import arrow.core.raise.RaiseCancellationException +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitCancellation +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.isActive +import kotlinx.coroutines.selects.SelectBuilder +import kotlinx.coroutines.selects.select +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.cancellation.CancellationException + +public interface RaiseRacingScope : Raise { + public suspend fun race( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend Raise.() -> A + ) + + public suspend fun raceOrRaise( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend Raise.() -> A + ) + + public suspend fun raceOrThrow( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend Raise.() -> A + ) + + public suspend fun raceOrFail( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend Raise.() -> A + ) +} + +public suspend fun Raise.racing( + dropped: suspend (Error) -> Unit, + handleException: (suspend (context: CoroutineContext, exception: Throwable) -> Unit)? = null, + block: RacingScope.() -> Unit, +): A { + TODO() +} + +/** + * A DSL that allows racing many `suspend` functions in parallel against each-other, + * it yields a final result of [A] based on the first function that yields a result. + * A racer can yield a result based on [RacingScope.race], or [RacingScope.raceOrFail]. + * + * [RacingScope.race] will call [handleException] in case of an exception, + * and then await **another successful result** but not cancel the race. Whilst [RacingScope.raceOrFail] will cancel the race, + * and rethrow the exception that occurred and thus cancel the race and all participating racers. + * + * ```kotlin + * suspend fun winner(): String = racing { + * race { delay(1000); "Winner" } + * race { throw RuntimeException("Loser") } + * } // Winner (logged RuntimeException) + * + * suspend fun winner(): String = racing { + * race { delay(1000); "loser" } + * raceOrFail { throw RuntimeException("Loser") } + * } // RuntimeException + * ``` + * + * **Important:** a racing program with no racers will hang forever. + * ```kotlin + * suspend fun never(): Nothing = racing { } + * ``` + * + * @param handleException handle any exception that occurred in [RacingScope.race]. + * @param block the body of the DSL that describes the racing logic + * @return the winning value of [A]. + */ +public suspend fun racing( + handleException: (suspend (context: CoroutineContext, exception: Throwable) -> Unit)? = null, + block: RacingScope.() -> Unit, +): A = coroutineScope { + val exceptionHandler = handleException ?: { _, _ -> } + select { + val scope = SelectRacingScope(this@select, this@coroutineScope, exceptionHandler) + block(scope) + require(scope.racers.get().isNotEmpty()) { "A racing program with no racers can never yield a result." } + } +} + +public interface RacingScope { + public suspend fun race( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> A + ) + + public suspend fun raceOrFail( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> A + ) +} + +private class SelectRacingScope( + private val select: SelectBuilder, + private val scope: CoroutineScope, + private val handleException: suspend (context: CoroutineContext, exception: Throwable) -> Unit +) : RacingScope { + val racers: Atomic>> = Atomic(emptyList()) + + override suspend fun raceOrFail( + context: CoroutineContext, + block: suspend CoroutineScope.() -> A + ) { + /* First we create a lazy racer, + * and we add it in front of the existing racers such that we maintain correct order. + * After we've successfully registered the racer, we check for race conditions, + * and 'start' racing. + */ + val racer = scope.async( + start = CoroutineStart.LAZY, + context = context, + block = block + ) + racers.update { racer prependTo it } + if (scope.isActive) { + require(racer.start()) { "Racer not started" } + return with(select) { + racer.onAwait.invoke(::identity) + } + } + } + + override suspend fun race(context: CoroutineContext, block: suspend CoroutineScope.() -> A) = + raceOrFail { + try { + block() + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + handleException(currentCoroutineContext(), e.nonFatalOrThrow()) + awaitCancellation() + } + } +} + +private suspend fun defaultExceptionHandler(): CoroutineExceptionHandler = + currentCoroutineContext()[CoroutineExceptionHandler] ?: DefaultCoroutineExceptionHandler + +private object DefaultCoroutineExceptionHandler : CoroutineExceptionHandler { + override val key: CoroutineContext.Key = CoroutineExceptionHandler + + override fun handleException(context: CoroutineContext, exception: Throwable) { + if (exception !is CancellationException) exception.printStackTrace() + } +}