Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Native multithreaded execution #4201

Draft
wants to merge 52 commits into
base: series/3.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
caaac9b
Got the ball rolling on SN 0.5
djspiewak Nov 26, 2024
cc600ac
Fixed more weird 0.4 -> 0.5 things
djspiewak Dec 8, 2024
d7306c7
Got WSTP-related stuffs compiling on native
djspiewak Dec 8, 2024
426776b
Reimplemented pats of `AtomicIntegerFieldUpdater`; now it's all linking
djspiewak Dec 8, 2024
489fbb9
Use SleepSystem by default on native
durban Dec 15, 2024
62b8141
Reënable EpollSystem, implement interrupt for it, mark epoll_wait as …
durban Dec 20, 2024
7f0a559
More helpful TestTimeoutException
durban Dec 20, 2024
8ebf321
Merge branch 'series/3.x' into wip/multithreaded-wstp
djspiewak Dec 26, 2024
afba12f
Adjusted DWARF version to avoid issues with 0.5.6 on macOS
djspiewak Dec 27, 2024
3a88218
Started noodling with kqueue interrupts
djspiewak Dec 27, 2024
e01f25b
Rewrote most of the kqueue stuff to be simpler
djspiewak Dec 28, 2024
06b0a65
Forgot to bump the base version
djspiewak Dec 28, 2024
ca7c89d
Enabled concurrent `Ref` on native
djspiewak Dec 28, 2024
081bc56
Make kqueue compatible with parallel GC on SN
djspiewak Dec 28, 2024
b0caa11
Enabled higher iterations from `ContSpec` on native
djspiewak Dec 28, 2024
6e75e13
Made `Deferred` parallelism specs common across JVM and native
djspiewak Dec 28, 2024
65fd3e2
Shifted JVM `IO` functionality to share with native
djspiewak Dec 28, 2024
e818c50
Made `Dispatcher` functionality common across JVM and native
djspiewak Dec 28, 2024
dfc9a17
Enabled higher parallelism on native queue specs
djspiewak Dec 28, 2024
eab5150
Skip `Dispatcher` interruption spec for the time being
djspiewak Dec 28, 2024
22110b6
Generalized high precision native `nowMicros`
djspiewak Dec 28, 2024
f86ed07
Restored syscall-reducing optimization in kqueue implementation
djspiewak Dec 28, 2024
4ec0a9a
Swapped out LongMap for TrieMap for callbacks in kqueue
djspiewak Dec 28, 2024
47befdd
prePR
djspiewak Dec 28, 2024
5804694
Shifted JVM-specific `MapRef` support to share with native
djspiewak Dec 28, 2024
72c7098
A bit of yak shaving for scala 3 and unused warnings
djspiewak Dec 28, 2024
bb84b42
Factored non-JVM-specific highly concurrent `IO` specs out to be shar…
djspiewak Dec 28, 2024
010a820
Factored `kevent64` out into blocking and non-blocking variants (also…
djspiewak Dec 28, 2024
0b545af
Merge branch 'series/3.x' into wip/multithreaded-wstp
djspiewak Dec 28, 2024
f4bc66a
EpollSystem: fix stackallocs; use TrieMap
durban Dec 31, 2024
ca750c9
Fix `nativeRunner` setting
armanbilge Jan 2, 2025
affc37c
Formatting
armanbilge Jan 2, 2025
77379c2
Use `sizeof` with `stackalloc`
armanbilge Jan 2, 2025
48d92dd
mulithreadify `IOApp`
armanbilge Jan 2, 2025
4dcb142
Delete single-thread native runtimes
armanbilge Jan 2, 2025
d5568bd
Regenerate workflow
armanbilge Jan 2, 2025
76e2df6
organize imports
armanbilge Jan 2, 2025
7859974
Share `SleepSystem`
armanbilge Jan 2, 2025
833ce4a
Fix bincompat
armanbilge Jan 2, 2025
cefddb2
Fix Scala 3 compile
armanbilge Jan 2, 2025
767c465
`EpollSystem` fixups
armanbilge Jan 2, 2025
b0e4ccf
use constant val defns in `LocalQueueConstants`
armanbilge Jan 2, 2025
4856714
Restore native+macos+JDK21 jobs to matrix
armanbilge Jan 2, 2025
651be36
Tweak filter
armanbilge Jan 2, 2025
53abc4d
`stackalloc` considered harmful
djspiewak Jan 3, 2025
22271cc
Split `epoll_wait` into blocking and non-blocking variants
djspiewak Jan 3, 2025
81ace6e
Restore `encodeKevent`
armanbilge Jan 3, 2025
024fe66
Restore kqueue event buffering
armanbilge Jan 5, 2025
c723bbc
Avoid `stackalloc` and `sizeof`
armanbilge Jan 5, 2025
09bc339
Formatting
armanbilge Jan 5, 2025
704c914
Fix `epoll_wait` invocation
armanbilge Jan 5, 2025
2eee55d
Fix more `stackalloc`s / `sizeof`s
armanbilge Jan 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,37 @@ jobs:
ci: ciChrome
- ci: ciNative
java: temurin@11
os: ubuntu-latest
- ci: ciNative
java: temurin@11
os: windows-latest
- ci: ciNative
java: temurin@11
os: macos-14
- ci: ciNative
java: temurin@17
os: ubuntu-latest
- ci: ciNative
java: temurin@17
os: windows-latest
- ci: ciNative
java: temurin@21
os: ubuntu-latest
- ci: ciNative
java: temurin@21
os: windows-latest
- ci: ciNative
java: temurin@21
os: macos-14
- ci: ciNative
java: graalvm@21
os: ubuntu-latest
- ci: ciNative
java: graalvm@21
os: windows-latest
- ci: ciNative
java: graalvm@21
os: macos-14
- os: windows-latest
ci: ciNative
- os: macos-14
Expand Down Expand Up @@ -610,5 +635,5 @@ jobs:
- name: Submit Dependencies
uses: scalacenter/sbt-dependency-submission@v2
with:
modules-ignore: cats-effect-benchmarks_3 cats-effect-benchmarks_2.12 cats-effect-benchmarks_2.13 cats-effect_3 cats-effect_2.12 cats-effect_2.13 cats-effect-example_sjs1_3 cats-effect-example_sjs1_2.12 cats-effect-example_sjs1_2.13 rootjs_3 rootjs_2.12 rootjs_2.13 ioapptestsnative_3 ioapptestsnative_2.12 ioapptestsnative_2.13 cats-effect-graalvm-example_3 cats-effect-graalvm-example_2.12 cats-effect-graalvm-example_2.13 cats-effect-tests_sjs1_3 cats-effect-tests_sjs1_2.12 cats-effect-tests_sjs1_2.13 rootjvm_3 rootjvm_2.12 rootjvm_2.13 rootnative_3 rootnative_2.12 rootnative_2.13 cats-effect-example_native0.4_3 cats-effect-example_native0.4_2.12 cats-effect-example_native0.4_2.13 cats-effect-example_3 cats-effect-example_2.12 cats-effect-example_2.13 cats-effect-tests_3 cats-effect-tests_2.12 cats-effect-tests_2.13 ioapptestsjvm_3 ioapptestsjvm_2.12 ioapptestsjvm_2.13 ioapptestsjs_3 ioapptestsjs_2.12 ioapptestsjs_2.13 cats-effect-tests_native0.4_3 cats-effect-tests_native0.4_2.12 cats-effect-tests_native0.4_2.13
modules-ignore: cats-effect-benchmarks_3 cats-effect-benchmarks_2.12 cats-effect-benchmarks_2.13 cats-effect_3 cats-effect_2.12 cats-effect_2.13 cats-effect-example_sjs1_3 cats-effect-example_sjs1_2.12 cats-effect-example_sjs1_2.13 rootjs_3 rootjs_2.12 rootjs_2.13 ioapptestsnative_3 ioapptestsnative_2.12 ioapptestsnative_2.13 cats-effect-graalvm-example_3 cats-effect-graalvm-example_2.12 cats-effect-graalvm-example_2.13 cats-effect-tests_sjs1_3 cats-effect-tests_sjs1_2.12 cats-effect-tests_sjs1_2.13 rootjvm_3 rootjvm_2.12 rootjvm_2.13 rootnative_3 rootnative_2.12 rootnative_2.13 cats-effect-example_native0.5_3 cats-effect-example_native0.5_2.12 cats-effect-example_native0.5_2.13 cats-effect-example_3 cats-effect-example_2.12 cats-effect-example_2.13 cats-effect-tests_3 cats-effect-tests_2.12 cats-effect-tests_2.13 ioapptestsjvm_3 ioapptestsjvm_2.12 ioapptestsjvm_2.13 ioapptestsjs_3 ioapptestsjs_2.12 ioapptestsjs_2.13 cats-effect-tests_native0.5_3 cats-effect-tests_native0.5_2.12 cats-effect-tests_native0.5_2.13
configs-ignore: test scala-tool scala-doc-tool test-internal
36 changes: 22 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.openqa.selenium.firefox.{FirefoxOptions, FirefoxProfile}
import org.scalajs.jsenv.nodejs.NodeJSEnv
import org.scalajs.jsenv.selenium.SeleniumJSEnv
import sbtcrossproject.CrossProject
import scala.scalanative.build._

import JSEnv._

Expand All @@ -41,7 +42,7 @@ ThisBuild / git.gitUncommittedChanges := {
}
}

ThisBuild / tlBaseVersion := "3.6"
ThisBuild / tlBaseVersion := "3.7"
ThisBuild / tlUntaggedAreSnapshots := false

ThisBuild / organization := "org.typelevel"
Expand Down Expand Up @@ -254,10 +255,11 @@ ThisBuild / githubWorkflowBuildMatrixExclusions := {
val nativeJavaAndOSFilters = {
val ci = CI.Native.command

val javaFilters =
(ThisBuild / githubWorkflowJavaVersions).value.filterNot(Set(ScalaNativeJava)).map {
java => MatrixExclude(Map("ci" -> ci, "java" -> java.render))
}
val javaFilters = for {
java <- (ThisBuild / githubWorkflowJavaVersions).value.filterNot(Set(ScalaNativeJava))
os <- (ThisBuild / githubWorkflowOSes).value
if !(os == MacOS && java == LatestJava)
} yield MatrixExclude(Map("ci" -> ci, "java" -> java.render, "os" -> os))

javaFilters ++ Seq(
MatrixExclude(Map("os" -> Windows, "ci" -> ci)),
Expand Down Expand Up @@ -300,12 +302,12 @@ ThisBuild / apiURL := Some(url("https://typelevel.org/cats-effect/api/3.x/"))

ThisBuild / autoAPIMappings := true

val CatsVersion = "2.11.0"
val CatsMtlVersion = "1.3.1"
val Specs2Version = "4.20.5"
val ScalaCheckVersion = "1.17.1"
val DisciplineVersion = "1.4.0"
val CoopVersion = "1.2.0"
val CatsVersion = "2.12.0"
val CatsMtlVersion = "1.5.0"
val Specs2Version = "4.20.9"
val ScalaCheckVersion = "1.18.1"
val DisciplineVersion = "1.5.0"
val CoopVersion = "1.3.0"

val MacrotaskExecutorVersion = "1.1.1"

Expand Down Expand Up @@ -406,7 +408,7 @@ lazy val kernel = crossProject(JSPlatform, JVMPlatform, NativePlatform)
libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % MacrotaskExecutorVersion % Test
)
.nativeSettings(
libraryDependencies += "io.github.cquiroz" %%% "scala-java-time" % "2.5.0"
libraryDependencies += "io.github.cquiroz" %%% "scala-java-time" % "2.6.0"
)

/**
Expand Down Expand Up @@ -940,7 +942,13 @@ lazy val tests: CrossProject = crossProject(JSPlatform, JVMPlatform, NativePlatf
Test / javaOptions += "-Dcats.effect.trackFiberContext=true"
)
.nativeSettings(
Compile / mainClass := Some("catseffect.examples.NativeRunner")
Compile / mainClass := Some("catseffect.examples.NativeRunner"),
nativeConfig ~= { c => // TODO: remove this when it seems to work
c.withSourceLevelDebuggingConfig(_.enableAll) // enable generation of debug information
.withOptimize(false) // disable Scala Native optimizer
.withMode(Mode.debug) // compile using LLVM without optimizations
.withCompileOptions(c.compileOptions ++ Seq("-gdwarf-4"))
}
)

def configureIOAppTests(p: Project): Project =
Expand All @@ -951,7 +959,7 @@ def configureIOAppTests(p: Project): Project =
buildInfoPackage := "cats.effect",
buildInfoKeys ++= Seq(
"jsRunner" -> (tests.js / Compile / fastOptJS / artifactPath).value,
"nativeRunner" -> (tests.native / Compile / nativeLink / artifactPath).value
"nativeRunner" -> (tests.native / Compile / crossTarget).value / (tests.native / Compile / moduleName).value
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ import scala.util.control.NonFatal
private[effect] final class BatchingMacrotaskExecutor(
batchSize: Int,
reportFailure0: Throwable => Unit
) extends ExecutionContextExecutor
with FiberExecutor {
) extends ExecutionContextExecutor {

private[this] val queueMicrotask: js.Function1[js.Function0[Any], Any] =
if (js.typeOf(js.Dynamic.global.queueMicrotask) == "function")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package cats.effect
package unsafe

import scala.concurrent.ExecutionContext
import scala.scalajs.{js, LinkingInfo}

private[effect] sealed abstract class FiberMonitor extends FiberMonitorShared {

/**
Expand All @@ -41,8 +44,8 @@ private[effect] sealed abstract class FiberMonitor extends FiberMonitorShared {

private final class FiberMonitorImpl(
// A reference to the compute pool of the `IORuntime` in which this suspended fiber bag
// operates. `null` if the compute pool of the `IORuntime` is not a `FiberExecutor`.
private[this] val compute: FiberExecutor
// operates. `null` if the compute pool of the `IORuntime` is not a `BatchingMacrotaskExecutor`.
private[this] val compute: BatchingMacrotaskExecutor
) extends FiberMonitor {
private[this] val bag = new WeakBag[IOFiber[_]]()

Expand Down Expand Up @@ -92,4 +95,26 @@ private final class NoOpFiberMonitor extends FiberMonitor {
def liveFiberSnapshot(print: String => Unit): Unit = ()
}

private[effect] object FiberMonitor extends FiberMonitorPlatform
private[effect] object FiberMonitor {
def apply(compute: ExecutionContext): FiberMonitor = {
if (LinkingInfo.developmentMode && weakRefsAvailable) {
if (compute.isInstanceOf[BatchingMacrotaskExecutor]) {
val bmec = compute.asInstanceOf[BatchingMacrotaskExecutor]
new FiberMonitorImpl(bmec)
} else {
new FiberMonitorImpl(null)
}
} else {
new NoOpFiberMonitor()
}
}

private[this] final val Undefined = "undefined"

/**
* Feature-tests for all the required, well, features :)
*/
private[unsafe] def weakRefsAvailable: Boolean =
js.typeOf(js.Dynamic.global.WeakRef) != Undefined &&
js.typeOf(js.Dynamic.global.FinalizationRegistry) != Undefined
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ import java.time.temporal.ChronoField
import java.util.concurrent.{Executors, ScheduledExecutorService}

private[unsafe] abstract class SchedulerCompanionPlatform { this: Scheduler.type =>
def createDefaultScheduler(): (Scheduler, () => Unit) = {

def createDefaultScheduler(): (Scheduler, () => Unit) =
createDefaultScheduler("io-scheduler")

def createDefaultScheduler(threadPrefix: String): (Scheduler, () => Unit) = {
val scheduler = Executors.newSingleThreadScheduledExecutor { r =>
val t = new Thread(r)
t.setName("io-scheduler")
t.setName(threadPrefix)
t.setDaemon(true)
t.setPriority(Thread.MAX_PRIORITY)
t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.control.NonFatal

import java.time.Instant
import java.time.temporal.ChronoField
import java.util.Comparator
import java.util.concurrent.{ConcurrentSkipListSet, ThreadLocalRandom}
import java.util.concurrent.atomic.{
Expand Down Expand Up @@ -78,7 +76,8 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
reportFailure0: Throwable => Unit
) extends ExecutionContextExecutor
with Scheduler
with UnsealedPollingContext[P] {
with UnsealedPollingContext[P]
with WorkStealingThreadPoolPlatform[P] {

import TracingConstants._
import WorkStealingThreadPoolConstants._
Expand Down Expand Up @@ -628,11 +627,6 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](

override def nowMillis(): Long = System.currentTimeMillis()

override def nowMicros(): Long = {
val now = Instant.now()
now.getEpochSecond() * 1000000 + now.getLong(ChronoField.MICRO_OF_SECOND)
}

/**
* Tries to call the current worker's `sleep`, but falls back to `sleepExternal` if needed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

import java.lang.management.ManagementFactory
import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor}
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger

import javax.management.ObjectName
Expand Down Expand Up @@ -246,19 +246,8 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
(ExecutionContext.fromExecutor(executor, reportFailure), { () => executor.shutdown() })
}

def createDefaultScheduler(threadPrefix: String = "io-scheduler"): (Scheduler, () => Unit) = {
val scheduler = new ScheduledThreadPoolExecutor(
1,
{ r =>
val t = new Thread(r)
t.setName(threadPrefix)
t.setDaemon(true)
t.setPriority(Thread.MAX_PRIORITY)
t
})
scheduler.setRemoveOnCancelPolicy(true)
(Scheduler.fromScheduledExecutor(scheduler), { () => scheduler.shutdown() })
}
def createDefaultScheduler(threadPrefix: String = "io-scheduler"): (Scheduler, () => Unit) =
Scheduler.createDefaultScheduler(threadPrefix)

def createDefaultPollingSystem(): PollingSystem = SelectorSystem()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
package cats.effect
package unsafe

/**
* An introspectable executor that runs fibers. Useful for fiber dumps.
*/
private[unsafe] trait FiberExecutor {
def liveTraces(): Map[IOFiber[_], Trace]
import java.time.Instant
import java.time.temporal.ChronoField

trait WorkStealingThreadPoolPlatform[P <: AnyRef] extends Scheduler {
this: WorkStealingThreadPool[P] =>

override def nowMicros(): Long = {
val now = Instant.now()
now.getEpochSecond() * 1000000 + now.getLong(ChronoField.MICRO_OF_SECOND)
}
}
Loading
Loading