Skip to content

Commit

Permalink
Merge pull request #4194 from biochimia/local-queue-docs
Browse files Browse the repository at this point in the history
chore: update LocalQueue docs/comments
  • Loading branch information
djspiewak authored Dec 10, 2024
2 parents 4afa4e3 + 54c3a58 commit a4d1702
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 40 deletions.
76 changes: 37 additions & 39 deletions core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ import java.util.concurrent.ThreadLocalRandom
* and therefore always has the most recent value, making memory synchronization unnecessary.
*
* ''Plain'' loads of `volatile` or [[java.util.concurrent.atomic.AtomicInteger]] are
* unfortunately unavailable on Java 8. These however are simulated with a specialized subclass
* of [[java.util.concurrent.atomic.AtomicInteger]] which additionally keeps a non-volatile
* unfortunately unavailable on Java 8. These however are simulated by keeping a non-volatile
* field which the owner [[WorkerThread]] reads in order to avoid ''acquire'' memory barriers
* which are unavoidable when reading a `volatile` field or using
* [[java.util.concurrent.atomic.AtomicInteger#get]].
* (unavoidable when reading a `volatile` field or using
* [[java.util.concurrent.atomic.AtomicInteger#get]]).
*
* 2. Loads with ''acquire'' memory semantics are achieved on the JVM either by a direct load of
* a `volatile` field or by using [[java.util.concurrent.atomic.AtomicInteger#get]].
Expand Down Expand Up @@ -296,39 +295,29 @@ private final class LocalQueue extends LocalQueuePadding {

/**
* Enqueues a batch of fibers to the local queue as a single operation without the enqueue
* overhead for each fiber. It a fiber from the batch to be directly executed without first
* enqueueing it on the local queue.
* overhead for each fiber. It returns a fiber from the batch to be directly executed without
* first enqueueing it on the local queue.
*
* @note
* Can '''only''' be correctly called by the owner [[WorkerThread]] when this queue is
* '''empty'''.
* Can '''only''' be correctly called by the owner [[WorkerThread]], and only when this
* queue has spare capacity to hold the batch. If called without enough spare capacity the
* method will spin forever.
*
* @note
* By convention, each batch of fibers contains exactly
* `LocalQueueConstants.OverflowBatchSize` number of fibers.
* `LocalQueueConstants.SpilloverBatchSize` number of fibers.
*
* @note
* The references inside the batch are not nulled out. It is important to never reference
* the batch after this usage, so that it can be garbage collected, and ultimately, the
* referenced fibers.
*
* @note
* In an ideal world, this method would involve only a single publishing of the `tail` of
* the queue to carry out the enqueueing of the whole batch of fibers. However, there must
* exist some synchronization with other threads, especially in situations where there are
* many more worker threads compared to the number of processors. In particular, there is
* one very unlucky interleaving where another worker thread can begin a steal operation
* from this queue, while this queue is filled to capacity. In that situation, the other
* worker thread would reserve half of the fibers in this queue, to transfer them to its own
* local queue. While that stealing operation is in place, this queue effectively operates
* with half of its capacity for the purposes of enqueueing new fibers. Should the stealing
* thread be preempted while the stealing operation is still underway, and the worker thread
* which owns this local queue executes '''every''' other fiber and tries enqueueing a
* batch, doing so without synchronization can end up overwriting the stolen fibers, simply
* because the size of the batch is larger than half of the queue. However, in normal
* operation with a properly sized thread pool, this pathological interleaving should never
* occur, and is also the reason why this operation has the same performance impact as the
* ideal non-synchronized version of this method.
* `LocalQueue.drainBatch` ensures there is "real" capacity to hold a batch, however
* actually enqueueing the batch may require waiting for an ongoing steal operation to
* finish, to be able to use that capacity without overwriting stolen fibers. If necessary,
* this method will spin until an ongoing steal operation concludes and releases the stolen
* capacity.
*
* @param batch
* the batch of fibers to be enqueued on this local queue
Expand All @@ -346,9 +335,7 @@ private final class LocalQueue extends LocalQueuePadding {
val hd = Head.updater.get(this)
val steal = msb(hd)

// Check the current occupancy of the queue. In the one pathological case
// described in the scaladoc for this class, this number will be equal to
// `LocalQueueCapacity`.
// Check the usable capacity of the queue.
val len = unsignedShortSubtraction(tl, steal)
if (len <= LocalQueueCapacityMinusBatch) {
// It is safe to transfer the fibers from the batch to the queue.
Expand All @@ -371,7 +358,7 @@ private final class LocalQueue extends LocalQueuePadding {
val newTl = unsignedShortAddition(tl, SpilloverBatchSize - 1)
Tail.updater.lazySet(this, newTl)
tail = newTl
// Return a fiber to be directly executed, withouth enqueueing it first
// Return a fiber to be directly executed, without enqueueing it first
// on the local queue. This does sacrifice some fairness, because the
// returned fiber might not be at the head of the local queue, but it is
// nevertheless an optimization to reduce contention on the local queue
Expand All @@ -380,6 +367,9 @@ private final class LocalQueue extends LocalQueuePadding {
// operations.
return fiber
}

// Not enough usable capacity, which means there is an ongoing steal
// operation. Spin until it completes.
}

// Technically this is unreachable code. The only way to break out of the
Expand Down Expand Up @@ -450,6 +440,12 @@ private final class LocalQueue extends LocalQueuePadding {
buffer(idx) = null
return fiber
}

// The "steal" tag was concurrently updated either because of a new
// incoming stealer, or because an ongoing steal is now complete. In the
// case of a new stealer, the real head will have moved, as well.
//
// In both cases, try again.
}

// Technically this is unreachable code. The only way to break out of the
Expand Down Expand Up @@ -480,9 +476,10 @@ private final class LocalQueue extends LocalQueuePadding {
* the stealing [[WorkerThread]].
*
* @param dst
* the destination local queue where the stole fibers will end up
* the destination local queue where the stolen fibers will end up
* @param dstWorker
* a reference to the owner worker thread, used for setting the active fiber reference
* a reference to the worker thread that owns the destination queue, used for setting the
* active fiber reference
* @return
* a reference to the first fiber to be executed by the stealing [[WorkerThread]], or `null`
* if the stealing was unsuccessful
Expand All @@ -495,19 +492,20 @@ private final class LocalQueue extends LocalQueuePadding {
// A load of the head of the destination queue using `acquire` semantics.
val dstHd = Head.updater.get(dst)

// Before a steal is attempted, make sure that the destination queue is not
// being stolen from. It can be argued that an attempt to steal fewer fibers
// can be made here, but it is simpler to give up completely.
// Before a steal is attempted, make sure that the destination queue has
// capacity for the stolen fibers. This is done only as a sanity check;
// WorkStealingThreadPool only steals fibers into an empty queue.
val dstSteal = msb(dstHd)
if (unsignedShortSubtraction(dstTl, dstSteal) > HalfLocalQueueCapacity) {
return null
}

// A CAS loop on the head of the queue (since it is a FIFO queue). The loop
// can break out of the whole method only when it has successfully moved
// the head by `size / 2` positions, securing the fibers to transfer in the
// process, the local queue is empty, or there is a `WorkerThread` already
// stealing from this queue.
// can break out of the whole method only when:
// - it has successfully moved the head by `size / 2` positions, securing
// the fibers to transfer in the process;
// - the local queue is empty;
// - or there is another `WorkerThread` already stealing from this queue.
while (true) {
// A load of the head of the local queue using `acquire` semantics.
var hd = Head.updater.get(this)
Expand All @@ -518,7 +516,7 @@ private final class LocalQueue extends LocalQueuePadding {
// Check for the presence of a `WorkerThread` which is already stealing
// from this queue.
if (steal != real) {
// There is a `WorkerThread` stealing from this queue. Give up.
// There is another `WorkerThread` stealing from this queue. Give up.
return null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import WorkStealingThreadPool._

/**
* Work-stealing thread pool which manages a pool of [[WorkerThread]] s for the specific purpose
* of executing [[java.lang.Runnable]] instancess with work-stealing scheduling semantics.
* of executing [[java.lang.Runnable]] instances with work-stealing scheduling semantics.
*
* The thread pool starts with `threadCount` worker threads in the active state, looking to find
* fibers to execute in their own local work stealing queues, or externally scheduled work
Expand Down

0 comments on commit a4d1702

Please sign in to comment.