Skip to content

Commit

Permalink
shift zenq core to native atomic types
Browse files Browse the repository at this point in the history
  • Loading branch information
alphadose committed Aug 5, 2022
1 parent 3cefe22 commit 23116b0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 38 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Benchmarks to support the above claims [here](#benchmarks)

## Installation

You need Golang [1.18.x](https://go.dev/dl/) or above since this package uses generics
You need Golang [1.19.x](https://go.dev/dl/) or above

```bash
$ go get github.com/alphadose/zenq/v2
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/alphadose/zenq/v2

go 1.18
go 1.19
67 changes: 31 additions & 36 deletions zenq.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// Known Limitations:-
//
// 1. Max queue_size = 2^16
// 2. The queue_size is a power of 2, in case a different size is provided then queue_size is rounded up to the next greater power of 2
// 2. The queue_size is a power of 2, in case a different size is provided then queue_size is rounded up to the next greater power of 2 upto a max of 2^16

// Suggestions:-
//
Expand All @@ -23,8 +23,6 @@ import (
"github.com/alphadose/zenq/v2/constants"
)

const maxQueueSize uint32 = 1 << 16

// ZenQ global state enums
const (
// Both reads and writes are possible
Expand Down Expand Up @@ -54,7 +52,7 @@ const (
type (
// a single slot in the queue
slot[T any] struct {
state uint32
atomic.Uint32
writeParker *ThreadParker[T]
item T
}
Expand All @@ -74,10 +72,10 @@ type (
}

// container for the selection events among multiple queues
selectFactory struct {
selectionState uint32
selectFactory[T any] struct {
selectionState atomic.Uint32
auxThread unsafe.Pointer
backlog unsafe.Pointer
backlog atomic.Pointer[T]
waitList List
}

Expand All @@ -86,20 +84,20 @@ type (
// The padding members 0 to 4 below are here to ensure each item is on a separate cache line.
// This prevents false sharing and hence improves performance.
_p0 cacheLinePadding
writerIndex uint32
_p1 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
readerIndex uint32
_p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
writerIndex atomic.Uint32
_p1 [constants.CacheLinePadSize - unsafe.Sizeof(atomic.Uint32{})]byte
readerIndex atomic.Uint32
_p2 [constants.CacheLinePadSize - unsafe.Sizeof(atomic.Uint32{})]byte
metaQ
_p3 [constants.CacheLinePadSize - unsafe.Sizeof(metaQ{})]byte
selectFactory
_p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte
selectFactory[T]
_p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory[T]{})]byte
}
)

// returns the next greater power of 2 relative to val
func nextGreaterPowerOf2(val uint32) uint32 {
return 1 << uint32(math.Min(math.Ceil(Fastlog2(math.Max(float64(val), 1))), 31))
return 1 << uint32(math.Min(math.Ceil(Fastlog2(math.Max(float64(val), 1))), 16))
}

// New returns a new queue given its payload type passed as a generic parameter
Expand All @@ -109,9 +107,6 @@ func New[T any](size uint32) *ZenQ[T] {
contents = make([]slot[T], queueSize, queueSize)
parkPool = sync.Pool{New: func() any { return new(parkSpot[T]) }}
)
if queueSize > maxQueueSize {
throw("maximum size of queue can be 2^16")
}
for idx := uint32(0); idx < queueSize; idx++ {
n := parkPool.Get().(*parkSpot[T])
n.threadPtr, n.next = nil, nil
Expand All @@ -125,7 +120,7 @@ func New[T any](size uint32) *ZenQ[T] {
free: parkPool.Put,
indexMask: uint16(queueSize - 1),
},
selectFactory: selectFactory{waitList: NewList()},
selectFactory: selectFactory[T]{waitList: NewList()},
}
go zenq.selectSender()
// allow the above auxillary thread to manifest
Expand Down Expand Up @@ -166,11 +161,11 @@ direct_send:
goto direct_send
}

slot := (*slot[T])(unsafe.Pointer(uintptr(self.strideLength)*(uintptr(self.indexMask)&uintptr(atomic.AddUint32(&self.writerIndex, 1))) + uintptr(self.contents)))
slot := (*slot[T])(unsafe.Pointer(uintptr(self.strideLength)*(uintptr(self.indexMask)&uintptr(self.writerIndex.Add(1))) + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == empty
for !atomic.CompareAndSwapUint32(&slot.state, SlotEmpty, SlotBusy) {
switch atomic.LoadUint32(&slot.state) {
for !slot.CompareAndSwap(SlotEmpty, SlotBusy) {
switch slot.Load() {
case SlotBusy:
wait()
case SlotCommitted:
Expand All @@ -186,17 +181,17 @@ direct_send:
}
}
slot.item = value
atomic.StoreUint32(&slot.state, SlotCommitted)
slot.Store(SlotCommitted)
return
}

// Read reads a value from the queue, you can once read once per object
func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
slot := (*slot[T])(unsafe.Pointer(uintptr(self.strideLength)*(uintptr(self.indexMask)&uintptr(atomic.AddUint32(&self.readerIndex, 1))) + uintptr(self.contents)))
slot := (*slot[T])(unsafe.Pointer(uintptr(self.strideLength)*(uintptr(self.indexMask)&uintptr(self.readerIndex.Add(1))) + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == committed
for !atomic.CompareAndSwapUint32(&slot.state, SlotCommitted, SlotBusy) {
switch atomic.LoadUint32(&slot.state) {
for !slot.CompareAndSwap(SlotCommitted, SlotBusy) {
switch slot.Load() {
case SlotBusy:
wait()
case SlotEmpty:
Expand All @@ -210,12 +205,12 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
mcall(gosched_m)
} else {
// queue is closed, decrement the reader index by 1
atomic.AddUint32(&self.readerIndex, math.MaxUint32)
self.readerIndex.Add(math.MaxUint32)
queueOpen = false
return
}
case SlotClosed:
if atomic.CompareAndSwapUint32(&slot.state, SlotClosed, SlotEmpty) {
if slot.CompareAndSwap(SlotClosed, SlotEmpty) {
Store8(&self.globalState, StateFullyClosed)
}
queueOpen = false
Expand All @@ -225,7 +220,7 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
}
}
data, queueOpen = slot.item, true
atomic.StoreUint32(&slot.state, SlotEmpty)
slot.Store(SlotEmpty)
return
}

Expand All @@ -242,11 +237,11 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
return
}
Store8(&self.globalState, StateClosedForWrites)
slot := (*slot[T])(unsafe.Pointer(uintptr(self.strideLength)*(uintptr(self.indexMask)&uintptr(atomic.AddUint32(&self.writerIndex, 1))) + uintptr(self.contents)))
slot := (*slot[T])(unsafe.Pointer(uintptr(self.strideLength)*(uintptr(self.indexMask)&uintptr(self.writerIndex.Add(1))) + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == empty
for !atomic.CompareAndSwapUint32(&slot.state, SlotEmpty, SlotBusy) {
switch atomic.LoadUint32(&slot.state) {
for !slot.CompareAndSwap(SlotEmpty, SlotBusy) {
switch slot.Load() {
case SlotBusy, SlotCommitted:
mcall(gosched_m)
case SlotEmpty:
Expand All @@ -256,7 +251,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
}
}
// Closing commit
atomic.StoreUint32(&slot.state, SlotClosed)
slot.Store(SlotClosed)
return
}

Expand All @@ -270,15 +265,15 @@ func (self *ZenQ[T]) CloseAsync() {

// ReadFromBackLog tries to read a data from backlog if available
func (self *ZenQ[T]) ReadFromBackLog() (data any, ok bool) {
if d := atomic.SwapPointer(&self.backlog, nil); d != nil {
if d := self.backlog.Swap(nil); d != nil {
data, ok = *((*T)(d)), true
}
return
}

// Signal is the mechanism by which a selector notifies this ZenQ's auxillary thread to contest for the selection
func (self *ZenQ[T]) Signal() uint8 {
if !atomic.CompareAndSwapUint32(&self.selectionState, SelectionOpen, SelectionRunning) {
if !self.selectionState.CompareAndSwap(SelectionOpen, SelectionRunning) {
return 0
} else {
safe_ready(self.auxThread)
Expand Down Expand Up @@ -374,8 +369,8 @@ func (self *ZenQ[T]) selectSender() {
// saves a lot of cpu time
if readState && queueOpen {
var i T = data
atomic.StorePointer(&self.backlog, unsafe.Pointer(&i))
self.backlog.Store(&i)
}
atomic.StoreUint32(&self.selectionState, SelectionOpen)
self.selectionState.Store(SelectionOpen)
}
}

0 comments on commit 23116b0

Please sign in to comment.