Skip to content

Commit

Permalink
sync: implement RWMutex using futexes
Browse files Browse the repository at this point in the history
Somewhat surprisingly, this results in smaller code than the old code
with the cooperative (tasks) scheduler. Probably because the new RWMutex
is also simpler.
  • Loading branch information
aykevl committed Dec 6, 2024
1 parent 3718514 commit d67bbf7
Showing 1 changed file with 64 additions and 99 deletions.
163 changes: 64 additions & 99 deletions src/sync/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,131 +7,96 @@ import (

type Mutex = task.Mutex

type RWMutex struct {
// waitingWriters are all of the tasks waiting for write locks.
waitingWriters task.Stack

// waitingReaders are all of the tasks waiting for a read lock.
waitingReaders task.Stack
//go:linkname runtimePanic runtime.runtimePanic
func runtimePanic(msg string)

// state is the current state of the RWMutex.
// Iff the mutex is completely unlocked, it contains rwMutexStateUnlocked (aka 0).
// Iff the mutex is write-locked, it contains rwMutexStateWLocked.
// While the mutex is read-locked, it contains the current number of readers.
state uint32
type RWMutex struct {
// Reader count, with the number of readers that currently have read-locked
// this mutex.
// The value can be in two states: one where 0 means no readers and another
// where -rwMutexMaxReaders means no readers. A base of 0 is normal
// uncontended operation, a base of -rwMutexMaxReaders means a writer has
// the lock or is trying to get the lock. In the second case, readers should
// wait until the reader count becomes non-negative again to give the writer
// a chance to obtain the lock.
readers task.Futex

// Writer futex, normally 0. If there is a writer waiting until all readers
// have unlocked, this value is 1. It will be changed to a 2 (and get a
// wake) when the last reader unlocks.
writer task.Futex

// Writer lock. Held between Lock() and Unlock().
writerLock Mutex
}

const (
rwMutexStateUnlocked = uint32(0)
rwMutexStateWLocked = ^uint32(0)
rwMutexMaxReaders = rwMutexStateWLocked - 1
)
const rwMutexMaxReaders = 1 << 30

func (rw *RWMutex) Lock() {
if rw.state == 0 {
// The mutex is completely unlocked.
// Lock without waiting.
rw.state = rwMutexStateWLocked
// Exclusive lock for writers.
rw.writerLock.Lock()

// Flag that we need to be awakened after the last read-lock unlocks.
rw.writer.Store(1)

// Signal to readers that they can't lock this mutex anymore.
n := uint32(rwMutexMaxReaders)
waiting := rw.readers.Add(-n)
if int32(waiting) == -rwMutexMaxReaders {
// All readers were already unlocked, so we don't need to wait for them.
rw.writer.Store(0)
return
}

// Wait for the lock to be released.
rw.waitingWriters.Push(task.Current())
task.Pause()
// There is at least one reader.
// Wait until all readers are unlocked. The last reader to unlock will set
// rw.writer to 2 and awaken us.
for rw.writer.Load() == 1 {
rw.writer.Wait(1)
}
rw.writer.Store(0)
}

func (rw *RWMutex) Unlock() {
switch rw.state {
case rwMutexStateWLocked:
// This is correct.

case rwMutexStateUnlocked:
// The mutex is already unlocked.
panic("sync: unlock of unlocked RWMutex")

default:
// The mutex is read-locked instead of write-locked.
panic("sync: write-unlock of read-locked RWMutex")
// Signal that new readers can lock this mutex.
waiting := rw.readers.Add(rwMutexMaxReaders)
if waiting != 0 {
// Awaken all waiting readers.
rw.readers.WakeAll()
}

switch {
case rw.maybeUnblockReaders():
// Switched over to read mode.

case rw.maybeUnblockWriter():
// Transferred to another writer.

default:
// Nothing is waiting for the lock.
rw.state = rwMutexStateUnlocked
}
// Done with this lock (next writer can try to get a lock).
rw.writerLock.Unlock()
}

func (rw *RWMutex) RLock() {
if rw.state == rwMutexStateWLocked {
// Wait for the write lock to be released.
rw.waitingReaders.Push(task.Current())
task.Pause()
return
}
// Add us as a reader.
newVal := rw.readers.Add(1)

if rw.state == rwMutexMaxReaders {
panic("sync: too many readers on RWMutex")
// Wait until the RWMutex is available for readers.
for int32(newVal) <= 0 {
rw.readers.Wait(newVal)
newVal = rw.readers.Load()
}

// Increase the reader count.
rw.state++
}

func (rw *RWMutex) RUnlock() {
switch rw.state {
case rwMutexStateUnlocked:
// The mutex is already unlocked.
panic("sync: unlock of unlocked RWMutex")

case rwMutexStateWLocked:
// The mutex is write-locked instead of read-locked.
panic("sync: read-unlock of write-locked RWMutex")
}

rw.state--
// Remove us as a reader.
one := uint32(1)
readers := int32(rw.readers.Add(-one))

if rw.state == rwMutexStateUnlocked {
// This was the last reader.
// Try to unblock a writer.
rw.maybeUnblockWriter()
// Check whether RUnlock was called too often.
if readers == -1 || readers == (-rwMutexMaxReaders)-1 {
runtimePanic("sync: RUnlock of unlocked RWMutex")
}
}

func (rw *RWMutex) maybeUnblockReaders() bool {
var n uint32
for {
t := rw.waitingReaders.Pop()
if t == nil {
break
if readers == -rwMutexMaxReaders {
// This was the last read lock. Check whether we need to wake up a write
// lock.
if rw.writer.CompareAndSwap(1, 2) {
rw.writer.Wake()
}

n++
scheduleTask(t)
}
if n == 0 {
return false
}

rw.state = n
return true
}

func (rw *RWMutex) maybeUnblockWriter() bool {
t := rw.waitingWriters.Pop()
if t == nil {
return false
}

rw.state = rwMutexStateWLocked
scheduleTask(t)

return true
}

type Locker interface {
Expand Down

0 comments on commit d67bbf7

Please sign in to comment.