diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index cc32a0f21d..a4ffd734ae 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -72,6 +72,8 @@ abstract class PollingSystem { def closePoller(poller: Poller): Unit /** + * Blocks the thread until an event is polled, the timeout expires, or interrupted. + * * @param poller * the thread-local [[Poller]] used to poll events. * @@ -79,18 +81,27 @@ abstract class PollingSystem { * the maximum duration for which to block, where `nanos == -1` indicates to block * indefinitely. * - * @param reportFailure - * callback that handles any failures that occur during polling. + * @return + * whether any ready events were polled and should be handled with [[processReadyEvents]]. + * If result is incomplete, then [[poll]] should be called again after + * [[processReadyEvents]]. + */ + def poll(poller: Poller, nanos: Long): PollResult + + /** + * Processes ready events e.g. collects their results and resumes the corresponding tasks. + * + * @param poller + * the thread-local [[Poller]] with ready events * * @return - * whether any events were polled. e.g. if the method returned due to timeout, this should - * be `false`. + * whether any of the ready events caused tasks to be rescheduled on the runtime */ - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean + def processReadyEvents(poller: Poller): Boolean /** * @return - * whether poll should be called again (i.e., there are more events to be polled) + * whether [[poll]] should be called again (i.e., there are more events to be polled) */ def needsPoll(poller: Poller): Boolean @@ -137,3 +148,23 @@ object PollingSystem { type Poller = P } } + +sealed abstract class PollResult +object PollResult { + + /** + * Polled all of the available ready events. + */ + case object Complete extends PollResult + + /** + * Polled some, but not all, of the available ready events. Poll should be called again to + * reap additional ready events. + */ + case object Incomplete extends PollResult + + /** + * The poll was interrupted or timed out before any events became ready. + */ + case object Interrupted extends PollResult +} diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala index 3fe46d8982..d921759670 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala @@ -41,7 +41,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS def closePoller(poller: Poller): Unit = poller.selector.close() - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { + def poll(poller: Poller, nanos: Long): PollResult = { val millis = if (nanos >= 0) nanos / 1000000 else -1 val selector = poller.selector @@ -49,52 +49,59 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS else if (millis > 0) selector.select(millis) else selector.select() - if (selector.isOpen()) { // closing selector interrupts select - var polled = false - - val ready = selector.selectedKeys().iterator() - while (ready.hasNext()) { - val key = ready.next() - ready.remove() - - var readyOps = 0 - var error: Throwable = null - try { - readyOps = key.readyOps() - // reset interest in triggered ops - key.interestOps(key.interestOps() & ~readyOps) - } catch { - case ex if NonFatal(ex) => - error = ex - readyOps = -1 // interest all waiters - } + // closing selector interrupts select + if (selector.isOpen() && !selector.selectedKeys().isEmpty()) + PollResult.Complete + else + PollResult.Interrupted + } - val value = if (error ne null) Left(error) else Right(readyOps) - - val callbacks = key.attachment().asInstanceOf[Callbacks] - val iter = callbacks.iterator() - while (iter.hasNext()) { - val node = iter.next() - - if ((node.interest & readyOps) != 0) { // drop this node and execute callback - node.remove() - val cb = node.callback - if (cb != null) { - cb(value) - polled = true - if (error ne null) poller.countSucceededOperation(readyOps) - else poller.countErroredOperation(node.interest) - } else { - poller.countCanceledOperation(node.interest) - } + def processReadyEvents(poller: Poller): Boolean = { + val selector = poller.selector + var fibersRescheduled = false + + val ready = selector.selectedKeys().iterator() + while (ready.hasNext()) { + val key = ready.next() + ready.remove() + + var readyOps = 0 + var error: Throwable = null + try { + readyOps = key.readyOps() + // reset interest in triggered ops + key.interestOps(key.interestOps() & ~readyOps) + } catch { + case ex if NonFatal(ex) => + error = ex + readyOps = -1 // interest all waiters + } + + val value = if (error ne null) Left(error) else Right(readyOps) + + val callbacks = key.attachment().asInstanceOf[Callbacks] + val iter = callbacks.iterator() + while (iter.hasNext()) { + val node = iter.next() + + if ((node.interest & readyOps) != 0) { // drop this node and execute callback + node.remove() + val cb = node.callback + if (cb != null) { + cb(value) + fibersRescheduled = true + if (error ne null) poller.countSucceededOperation(readyOps) + else poller.countErroredOperation(node.interest) + } else { + poller.countCanceledOperation(node.interest) } } - - () } - polled - } else false + () + } + + fibersRescheduled } def needsPoll(poller: Poller): Boolean = diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala index 6caa6ba00e..4a49ffce22 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -34,16 +34,18 @@ object SleepSystem extends PollingSystem { def closePoller(Poller: Poller): Unit = () - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { + def poll(poller: Poller, nanos: Long): PollResult = { if (nanos < 0) LockSupport.park() else if (nanos > 0) LockSupport.parkNanos(nanos) else () - false + PollResult.Interrupted } + def processReadyEvents(poller: Poller): Boolean = false + def needsPoll(poller: Poller): Boolean = false def interrupt(targetThread: Thread, targetPoller: Poller): Unit = diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 261f02093c..4fbf6c79b5 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -310,7 +310,6 @@ private[effect] final class WorkerThread[P <: AnyRef]( val self = this random = ThreadLocalRandom.current() val rnd = random - val reportFailure = pool.reportFailure(_) /* * A counter (modulo `ExternalQueueTicks`) which represents the @@ -422,22 +421,34 @@ private[effect] final class WorkerThread[P <: AnyRef]( } } + @tailrec + def drainReadyEvents(result: PollResult, acc: Boolean): Boolean = + if (result ne PollResult.Interrupted) { + val tasksScheduled = system.processReadyEvents(_poller) | acc + if (result eq PollResult.Complete) tasksScheduled + else drainReadyEvents(system.poll(_poller, 0), tasksScheduled) + } else { + acc + } + // returns true if polled event, false if unparked def parkLoop(): Boolean = { while (!done.get()) { // Park the thread until further notice. val start = System.nanoTime() metrics.incrementPolledCount() - val polled = system.poll(_poller, -1, reportFailure) + val pollResult = system.poll(_poller, -1) now = System.nanoTime() // update now metrics.addIdleTime(now - start) // the only way we can be interrupted here is if it happened *externally* (probably sbt) if (isInterrupted()) { pool.shutdown() - } else if (polled) { + } else if (pollResult ne PollResult.Interrupted) { if (parked.getAndSet(false)) pool.doneSleeping() + // TODO, if no tasks scheduled could fastpath back to park? + val _ = drainReadyEvents(pollResult, false) return true } else if (!parked.get()) { // Spurious wakeup check. return false @@ -464,7 +475,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( if (nanos > 0L) { val start = now metrics.incrementPolledCount() - val polled = system.poll(_poller, nanos, reportFailure) + val pollResult = system.poll(_poller, nanos) // we already parked and time passed, so update time again // it doesn't matter if we timed out or were awakened, the update is free-ish now = System.nanoTime() @@ -475,11 +486,15 @@ private[effect] final class WorkerThread[P <: AnyRef]( false // we know `done` is `true` } else { // no matter why we woke up, there may be timers or events ready + val polled = pollResult ne PollResult.Interrupted if (polled || (triggerTime - now <= 0)) { // we timed out or polled an event if (parked.getAndSet(false)) { pool.doneSleeping() } + if (polled) { // TODO, if no tasks scheduled and no timers could fastpath back to park? + val _ = drainReadyEvents(pollResult, false) + } true } else { // we were either awakened spuriously or intentionally if (parked.get()) // awakened spuriously, re-check next sleeper @@ -579,7 +594,9 @@ private[effect] final class WorkerThread[P <: AnyRef]( sleepers.packIfNeeded() // give the polling system a chance to discover events metrics.incrementPolledCount() - system.poll(_poller, 0, reportFailure) + if (system.needsPoll(_poller)) { + val _ = drainReadyEvents(system.poll(_poller, 0), false) + } // Obtain a fiber or batch of fibers from the external queue. val element = external.poll(rnd) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index ae242ae85b..e33664e645 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -23,14 +23,13 @@ import cats.syntax.all._ import org.typelevel.scalaccompat.annotation._ -import scala.annotation.tailrec import scala.scalanative.annotation.alwaysinline import scala.scalanative.libc.errno._ import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.unistd -import scala.scalanative.runtime._ +import scala.scalanative.runtime.{Array => _, _} import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ @@ -60,9 +59,12 @@ object EpollSystem extends PollingSystem { def closePoller(poller: Poller): Unit = poller.close() - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = + def poll(poller: Poller, nanos: Long): PollResult = poller.poll(nanos) + def processReadyEvents(poller: Poller): Boolean = + poller.processReadyEvents() + def needsPoll(poller: Poller): Boolean = poller.needsPoll() def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () @@ -182,44 +184,40 @@ object EpollSystem extends PollingSystem { private[this] val handles: Set[PollHandle] = Collections.newSetFromMap(new IdentityHashMap) + private[this] val eventsArray = new Array[Byte](sizeof[epoll_event].toInt * MaxEvents) + @inline private[this] def events = eventsArray.atUnsafe(0).asInstanceOf[Ptr[epoll_event]] + private[this] var readyEventCount: Int = 0 + private[EpollSystem] def close(): Unit = if (unistd.close(epfd) != 0) throw new IOException(fromCString(strerror(errno))) - private[EpollSystem] def poll(timeout: Long): Boolean = { - - val events = stackalloc[epoll_event](MaxEvents.toULong) - var polled = false - - @tailrec - def processEvents(timeout: Int): Unit = { - - val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout) - - if (triggeredEvents >= 0) { - polled = true - - var i = 0 - while (i < triggeredEvents) { - val event = events + i.toLong - val handle = fromPtr(event.data) - handle.notify(event.events.toInt) - i += 1 - } - } else if (errno != EINTR) { // spurious wake-up by signal - throw new IOException(fromCString(strerror(errno))) - } - - if (triggeredEvents >= MaxEvents) - processEvents(0) // drain the ready list - else - () - } + private[EpollSystem] def poll(timeout: Long): PollResult = { val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt - processEvents(timeoutMillis) + val rtn = epoll_wait(epfd, events, MaxEvents, timeoutMillis) + if (rtn >= 0) { + readyEventCount = rtn + if (rtn > 0) { + if (rtn < MaxEvents) PollResult.Complete else PollResult.Incomplete + } else PollResult.Interrupted + } else if (errno == EINTR) { // spurious wake-up by signal + PollResult.Interrupted + } else { + throw new IOException(fromCString(strerror(errno))) + } + } - polled + private[EpollSystem] def processReadyEvents(): Boolean = { + var i = 0 + while (i < readyEventCount) { + val event = events + i.toLong + val handle = fromPtr(event.data) + handle.notify(event.events.toInt) + i += 1 + } + readyEventCount = 0 + true } private[EpollSystem] def needsPoll(): Boolean = !handles.isEmpty() diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala index 17f0f2e5ef..87b8e62422 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala @@ -17,6 +17,7 @@ package cats.effect package unsafe +import scala.annotation.tailrec import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} import scala.concurrent.duration._ import scala.scalanative.libc.errno._ @@ -127,9 +128,15 @@ private[effect] final class EventLoopExecutorScheduler[P]( * the Scala Native global `ExecutionContext` which is currently hard-coded into every * test framework, including MUnit, specs2, and Weaver. */ - if (system.needsPoll(poller) || timeout != -1) - system.poll(poller, timeout, reportFailure) - else () + if (system.needsPoll(poller) || timeout != -1) { + @tailrec def loop(result: PollResult): Unit = + if (result ne PollResult.Interrupted) { + system.processReadyEvents(poller) + if (result eq PollResult.Incomplete) loop(system.poll(poller, 0)) + } + + loop(system.poll(poller, timeout)) + } continue = !executeQueue.isEmpty() || !sleepQueue.isEmpty() || system.needsPoll(poller) } diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index e884432b4e..5b835f590c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -23,7 +23,6 @@ import cats.syntax.all._ import org.typelevel.scalaccompat.annotation._ -import scala.annotation.tailrec import scala.collection.mutable.LongMap import scala.scalanative.libc.errno._ import scala.scalanative.posix.errno._ @@ -59,9 +58,12 @@ object KqueueSystem extends PollingSystem { def closePoller(poller: Poller): Unit = poller.close() - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = + def poll(poller: Poller, nanos: Long): PollResult = poller.poll(nanos) + def processReadyEvents(poller: Poller): Boolean = + poller.processReadyEvents() + def needsPoll(poller: Poller): Boolean = poller.needsPoll() @@ -138,10 +140,11 @@ object KqueueSystem extends PollingSystem { final class Poller private[KqueueSystem] (kqfd: Int) { - private[this] val changelistArray = new Array[Byte](sizeof[kevent64_s].toInt * MaxEvents) - @inline private[this] def changelist = - changelistArray.atUnsafe(0).asInstanceOf[Ptr[kevent64_s]] + private[this] val buffer = new Array[Byte](sizeof[kevent64_s].toInt * MaxEvents) + @inline private[this] def eventlist = + buffer.atUnsafe(0).asInstanceOf[Ptr[kevent64_s]] private[this] var changeCount = 0 + private[this] var readyEventCount = 0 private[this] val callbacks = new LongMap[Either[Throwable, Unit] => Unit]() @@ -151,11 +154,11 @@ object KqueueSystem extends PollingSystem { flags: CUnsignedShort, cb: Either[Throwable, Unit] => Unit ): Unit = { - val change = changelist + changeCount.toLong + val event = eventlist + changeCount.toLong - change.ident = ident.toULong - change.filter = filter - change.flags = (flags.toInt | EV_ONESHOT).toUShort + event.ident = ident.toULong + event.filter = filter + event.flags = (flags.toInt | EV_ONESHOT).toUShort callbacks.update(encodeKevent(ident, filter), cb) @@ -171,54 +174,7 @@ object KqueueSystem extends PollingSystem { if (unistd.close(kqfd) != 0) throw new IOException(fromCString(strerror(errno))) - private[KqueueSystem] def poll(timeout: Long): Boolean = { - - val eventlist = stackalloc[kevent64_s](MaxEvents.toULong) - var polled = false - - @tailrec - def processEvents(timeout: Ptr[timespec], changeCount: Int, flags: Int): Unit = { - - val triggeredEvents = - kevent64( - kqfd, - changelist, - changeCount, - eventlist, - MaxEvents, - flags.toUInt, - timeout - ) - - if (triggeredEvents >= 0) { - polled = true - - var i = 0 - var event = eventlist - while (i < triggeredEvents) { - val kevent = encodeKevent(event.ident.toInt, event.filter) - val cb = callbacks.getOrNull(kevent) - callbacks -= kevent - - if (cb ne null) - cb( - if ((event.flags.toLong & EV_ERROR) != 0) - Left(new IOException(fromCString(strerror(event.data.toInt)))) - else Either.unit - ) - - i += 1 - event += 1 - } - } else if (errno != EINTR) { // spurious wake-up by signal - throw new IOException(fromCString(strerror(errno))) - } - - if (triggeredEvents >= MaxEvents) - processEvents(null, 0, KEVENT_FLAG_IMMEDIATE) // drain the ready list - else - () - } + private[KqueueSystem] def poll(timeout: Long): PollResult = { val timeoutSpec = if (timeout <= 0) null @@ -231,13 +187,53 @@ object KqueueSystem extends PollingSystem { val flags = if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE - processEvents(timeoutSpec, changeCount, flags) + val rtn = kevent64( + kqfd, + eventlist, + changeCount, + eventlist, + MaxEvents, + flags.toUInt, + timeoutSpec + ) changeCount = 0 - polled + if (rtn >= 0) { + readyEventCount = rtn + if (rtn > 0) { + if (rtn < MaxEvents) PollResult.Complete else PollResult.Incomplete + } else PollResult.Interrupted + } else if (errno == EINTR) { // spurious wake-up by signal + PollResult.Interrupted + } else { + throw new IOException(fromCString(strerror(errno))) + } + } + + private[KqueueSystem] def processReadyEvents(): Boolean = { + var i = 0 + var event = eventlist + while (i < readyEventCount) { + val kevent = encodeKevent(event.ident.toInt, event.filter) + val cb = callbacks.getOrNull(kevent) + callbacks -= kevent + + if (cb ne null) + cb( + if ((event.flags.toLong & EV_ERROR) != 0) + Left(new IOException(fromCString(strerror(event.data.toInt)))) + else Either.unit + ) + + i += 1 + event += 1 + } + + readyEventCount = 0 + true } - def needsPoll(): Boolean = changeCount > 0 || callbacks.nonEmpty + private[KqueueSystem] def needsPoll(): Boolean = changeCount > 0 || callbacks.nonEmpty } @nowarn212 diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index 125ee1ce9f..8f87e9a186 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -37,14 +37,15 @@ abstract class PollingExecutorScheduler(pollEvery: Int) def makeApi(ctx: PollingContext[Poller]): Api = outer def makePoller(): Poller = outer def closePoller(poller: Poller): Unit = () - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { + def poll(poller: Poller, nanos: Long): PollResult = { needsPoll = if (nanos == -1) poller.poll(Duration.Inf) else poller.poll(nanos.nanos) - true + PollResult.Complete } + def processReadyEvents(poller: Poller): Boolean = true def needsPoll(poller: Poller) = needsPoll def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () def metrics(poller: Poller): PollerMetrics = PollerMetrics.noop diff --git a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index 198bc640de..a01eed4bd9 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -32,12 +32,14 @@ object SleepSystem extends PollingSystem { def closePoller(poller: Poller): Unit = () - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { + def poll(poller: Poller, nanos: Long): PollResult = { if (nanos > 0) Thread.sleep(nanos / 1000000, (nanos % 1000000).toInt) - false + PollResult.Interrupted } + def processReadyEvents(poller: Poller): Boolean = false + def needsPoll(poller: Poller): Boolean = false def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala index 0ad5289f2a..fd29fac4d4 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala @@ -20,6 +20,7 @@ import cats.effect.std.Semaphore import cats.effect.unsafe.{ IORuntime, IORuntimeConfig, + PollResult, PollingContext, PollingSystem, SleepSystem, @@ -486,46 +487,54 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala ok } - "wake parked thread for polled events" in { + trait DummyPoller { + def poll: IO[Unit] + } - trait DummyPoller { - def poll: IO[Unit] - } + object DummySystem extends PollingSystem { + type Api = DummyPoller + type Poller = AtomicReference[List[Either[Throwable, Unit] => Unit]] - object DummySystem extends PollingSystem { - type Api = DummyPoller - type Poller = AtomicReference[List[Either[Throwable, Unit] => Unit]] + def close() = () - def close() = () + def makePoller() = new AtomicReference(List.empty[Either[Throwable, Unit] => Unit]) + def needsPoll(poller: Poller) = poller.get.nonEmpty + def closePoller(poller: Poller) = () + def metrics(poller: Poller): PollerMetrics = PollerMetrics.noop - def makePoller() = new AtomicReference(List.empty[Either[Throwable, Unit] => Unit]) - def needsPoll(poller: Poller) = poller.get.nonEmpty - def closePoller(poller: Poller) = () - def metrics(poller: Poller): PollerMetrics = PollerMetrics.noop + def interrupt(targetThread: Thread, targetPoller: Poller) = + SleepSystem.interrupt(targetThread, SleepSystem.makePoller()) - def interrupt(targetThread: Thread, targetPoller: Poller) = - SleepSystem.interrupt(targetThread, SleepSystem.makePoller()) + def poll(poller: Poller, nanos: Long) = { + poller.get() match { + case Nil => + SleepSystem.poll(SleepSystem.makePoller(), nanos) + case _ => PollResult.Complete + } + } - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit) = { - poller.getAndSet(Nil) match { - case Nil => - SleepSystem.poll(SleepSystem.makePoller(), nanos, reportFailure) - case cbs => - cbs.foreach(_.apply(Right(()))) - true - } + def processReadyEvents(poller: Poller) = { + poller.getAndSet(Nil) match { + case Nil => + false + case cbs => + cbs.foreach(_.apply(Right(()))) + true } + } - def makeApi(ctx: PollingContext[Poller]): DummySystem.Api = - new DummyPoller { - def poll = IO.async_[Unit] { cb => - ctx.accessPoller { poller => - poller.getAndUpdate(cb :: _) - () - } + def makeApi(ctx: PollingContext[Poller]): DummySystem.Api = + new DummyPoller { + def poll = IO.async_[Unit] { cb => + ctx.accessPoller { poller => + poller.getAndUpdate(cb :: _) + () } } - } + } + } + + "wake parked thread for polled events" in { val (pool, poller, shutdown) = IORuntime.createWorkStealingComputeThreadPool( threads = 2, @@ -548,45 +557,6 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala "poll punctually on a single-thread runtime with concurrent sleepers" in { - trait DummyPoller { - def poll: IO[Unit] - } - - object DummySystem extends PollingSystem { - type Api = DummyPoller - type Poller = AtomicReference[List[Either[Throwable, Unit] => Unit]] - - def close() = () - - def makePoller() = new AtomicReference(List.empty[Either[Throwable, Unit] => Unit]) - def needsPoll(poller: Poller) = poller.get.nonEmpty - def closePoller(poller: Poller) = () - def metrics(poller: Poller): PollerMetrics = PollerMetrics.noop - - def interrupt(targetThread: Thread, targetPoller: Poller) = - SleepSystem.interrupt(targetThread, SleepSystem.makePoller()) - - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit) = { - poller.getAndSet(Nil) match { - case Nil => - SleepSystem.poll(SleepSystem.makePoller(), nanos, reportFailure) - case cbs => - cbs.foreach(_.apply(Right(()))) - true - } - } - - def makeApi(ctx: PollingContext[Poller]): DummySystem.Api = - new DummyPoller { - def poll = IO.async_[Unit] { cb => - ctx.accessPoller { poller => - poller.getAndUpdate(cb :: _) - () - } - } - } - } - val (pool, poller, shutdown) = IORuntime.createWorkStealingComputeThreadPool( threads = 1, pollingSystem = DummySystem)