Skip to content

Commit

Permalink
shift list to native atomic types
Browse files Browse the repository at this point in the history
  • Loading branch information
alphadose committed Aug 5, 2022
1 parent 5718598 commit 3fd4465
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package zenq
import (
"sync"
"sync/atomic"
"unsafe"
)

// global memory pool for storing and leasing node objects
var (
nodePool = sync.Pool{New: func() any { return unsafe.Pointer(new(node)) }}
nodePool = sync.Pool{New: func() any { return new(node) }}
nodeGet = nodePool.Get
nodePut = nodePool.Put
)
Expand All @@ -17,42 +16,45 @@ var (
// theory -> https://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
// pseudocode -> https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type List struct {
head unsafe.Pointer
tail unsafe.Pointer
head atomic.Pointer[node]
tail atomic.Pointer[node]
}

// NewList returns a new list
func NewList() List {
n := nodeGet().(unsafe.Pointer)
(*node)(n).next, (*node)(n).value = nil, nil
return List{head: n, tail: n}
n := nodeGet().(*node)
n.value = nil
n.next.Store(nil)
var ptr atomic.Pointer[node]
ptr.Store(n)
return List{head: ptr, tail: ptr}
}

// a single node in the linked list
type node struct {
next unsafe.Pointer
next atomic.Pointer[node]
value *Selection
}

// Enqueue inserts a value into the list
func (l *List) Enqueue(value *Selection) {
var (
n = nodeGet().(unsafe.Pointer)
tail, next unsafe.Pointer
n = nodeGet().(*node)
tail, next *node
)
(*node)(n).next, (*node)(n).value = nil, value
n.value = value
for {
tail = atomic.LoadPointer(&l.tail)
next = atomic.LoadPointer(&(*node)(tail).next)
if tail == atomic.LoadPointer(&l.tail) { // are tail and next consistent?
tail = l.tail.Load()
next = tail.next.Load()
if tail == l.tail.Load() { // are tail and next consistent?
if next == nil {
if atomic.CompareAndSwapPointer(&(*node)(tail).next, next, n) {
atomic.CompareAndSwapPointer(&l.tail, tail, n) // Enqueue is done. try to swing tail to the inserted node
if tail.next.CompareAndSwap(next, n) {
l.tail.CompareAndSwap(tail, n) // Enqueue is done. try to swing tail to the inserted node
return
}
} else { // tail was not pointing to the last node
// try to swing Tail to the next node
atomic.CompareAndSwapPointer(&l.tail, tail, next)
l.tail.CompareAndSwap(tail, next)
}
}
}
Expand All @@ -61,24 +63,25 @@ func (l *List) Enqueue(value *Selection) {
// Dequeue removes and returns the value at the head of the queue to the memory pool
// It returns nil if the list is empty
func (l *List) Dequeue() (value *Selection) {
var head, tail, next unsafe.Pointer
var head, tail, next *node
for {
head = atomic.LoadPointer(&l.head)
tail = atomic.LoadPointer(&l.tail)
next = atomic.LoadPointer(&(*node)(head).next)
if head == atomic.LoadPointer(&l.head) { // are head, tail, and next consistent?
head = l.head.Load()
tail = l.tail.Load()
next = head.next.Load()
if head == l.head.Load() { // are head, tail, and next consistent?
if head == tail { // is list empty or tail falling behind?
if next == nil { // is list empty?
return nil
}
// tail is falling behind. try to advance it
atomic.CompareAndSwapPointer(&l.tail, tail, next)
l.tail.CompareAndSwap(tail, next)
} else {
// read value before CAS_node otherwise another dequeue might free the next node
value = (*node)(next).value
if atomic.CompareAndSwapPointer(&l.head, head, next) {
value = next.value
if l.head.CompareAndSwap(head, next) {
// sysFreeOS(unsafe.Pointer(head), nodeSize)
(*node)(head).next, (*node)(head).value = nil, nil
head.value = nil
head.next.Store(nil)
nodePut(head)
return // Dequeue is done. return
}
Expand Down

0 comments on commit 3fd4465

Please sign in to comment.