From 0a39dc926537b4e4cc181743c8cbc3e0e5686909 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Thu, 24 Oct 2024 10:26:17 +0200 Subject: [PATCH] WIP map every goroutine to a new OS thread --- compileopts/config.go | 1 + compileopts/options.go | 2 +- compileopts/options_test.go | 2 +- compileopts/target.go | 5 +- src/internal/task/futex_linux.c | 8 + src/internal/task/futex_linux.go | 8 + src/internal/task/linux.go | 9 + src/internal/task/semaphore.go | 32 ++ src/internal/task/task_threads.c | 109 +++++++ src/internal/task/task_threads.go | 260 +++++++++++++++ src/runtime/chan.go | 2 + src/runtime/chan2.go | 513 ++++++++++++++++++++++++++++++ src/runtime/gc_stack_raw.go | 2 +- src/runtime/gc_stack_threads.go | 25 ++ src/runtime/scheduler_threads.go | 121 +++++++ 15 files changed, 1095 insertions(+), 4 deletions(-) create mode 100644 src/internal/task/linux.go create mode 100644 src/internal/task/semaphore.go create mode 100644 src/internal/task/task_threads.c create mode 100644 src/internal/task/task_threads.go create mode 100644 src/runtime/chan2.go create mode 100644 src/runtime/gc_stack_threads.go create mode 100644 src/runtime/scheduler_threads.go diff --git a/compileopts/config.go b/compileopts/config.go index 44d3b005cc..912dbbaaac 100644 --- a/compileopts/config.go +++ b/compileopts/config.go @@ -340,6 +340,7 @@ func (c *Config) CFlags(libclang bool) []string { "-nostdlibinc", "-isystem", filepath.Join(path, "include"), "-isystem", filepath.Join(root, "lib", "musl", "arch", arch), + "-isystem", filepath.Join(root, "lib", "musl", "arch", "generic"), "-isystem", filepath.Join(root, "lib", "musl", "include"), ) case "wasi-libc": diff --git a/compileopts/options.go b/compileopts/options.go index b83f6f63ba..e879a84862 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -10,7 +10,7 @@ import ( var ( validBuildModeOptions = []string{"default", "c-shared"} validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} validSerialOptions = []string{"none", "uart", "usb", "rtt"} validPrintSizeOptions = []string{"none", "short", "full"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/compileopts/options_test.go b/compileopts/options_test.go index 23ffec465f..bd6e4c04ea 100644 --- a/compileopts/options_test.go +++ b/compileopts/options_test.go @@ -10,7 +10,7 @@ import ( func TestVerifyOptions(t *testing.T) { expectedGCError := errors.New(`invalid gc option 'incorrect': valid values are none, leaking, conservative, custom, precise`) - expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify`) + expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify, threads`) expectedPrintSizeError := errors.New(`invalid size option 'incorrect': valid values are none, short, full`) expectedPanicStrategyError := errors.New(`invalid panic option 'incorrect': valid values are print, trap`) diff --git a/compileopts/target.go b/compileopts/target.go index 96732e2337..ba40f56fb7 100644 --- a/compileopts/target.go +++ b/compileopts/target.go @@ -247,7 +247,6 @@ func defaultTarget(options *Options) (*TargetSpec, error) { GOARCH: options.GOARCH, BuildTags: []string{options.GOOS, options.GOARCH}, GC: "precise", - Scheduler: "tasks", Linker: "cc", DefaultStackSize: 1024 * 64, // 64kB GDB: []string{"gdb"}, @@ -378,6 +377,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { platformVersion = "11.0.0" // first macosx platform with arm64 support } llvmvendor = "apple" + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "darwin-libSystem" // Use macosx* instead of darwin, otherwise darwin/arm64 will refer to @@ -394,6 +394,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "linux": + spec.Scheduler = "threads" spec.Linker = "ld.lld" spec.RTLib = "compiler-rt" spec.Libc = "musl" @@ -414,9 +415,11 @@ func defaultTarget(options *Options) (*TargetSpec, error) { } spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/task/futex_linux.c", + "src/internal/task/task_threads.c", "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "windows": + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "mingw-w64" // Note: using a medium code model, low image base and no ASLR diff --git a/src/internal/task/futex_linux.c b/src/internal/task/futex_linux.c index 4a084d3e58..1db7a25e01 100644 --- a/src/internal/task/futex_linux.c +++ b/src/internal/task/futex_linux.c @@ -5,6 +5,7 @@ #include #include +#include #include #define FUTEX_WAIT 0 @@ -15,6 +16,13 @@ void tinygo_futex_wait(uint32_t *addr, uint32_t cmp) { syscall(SYS_futex, addr, FUTEX_WAIT|FUTEX_PRIVATE, cmp, NULL, NULL, 0); } +void tinygo_futex_wait_timeout(uint32_t *addr, uint32_t cmp, uint64_t timeout) { + struct timespec ts = {0}; + ts.tv_sec = timeout / 1000000000; + ts.tv_nsec = timeout % 1000000000; + syscall(SYS_futex, addr, FUTEX_WAIT|FUTEX_PRIVATE, cmp, &ts, NULL, 0); +} + void tinygo_futex_wake(uint32_t *addr, uint32_t num) { syscall(SYS_futex, addr, FUTEX_WAKE|FUTEX_PRIVATE, num, NULL, NULL, 0); } diff --git a/src/internal/task/futex_linux.go b/src/internal/task/futex_linux.go index 44709e4e96..1966af2a92 100644 --- a/src/internal/task/futex_linux.go +++ b/src/internal/task/futex_linux.go @@ -31,6 +31,11 @@ func (f *Futex) Wait(cmp uint32) bool { return false } +// Like Wait, but times out after the number of nanoseconds in timeout. +func (f *Futex) WaitUntil(cmp uint32, timeout uint64) { + tinygo_futex_wait_timeout((*uint32)(unsafe.Pointer(&f.Uint32)), cmp, timeout) +} + // Wake a single waiter. func (f *Futex) Wake() { tinygo_futex_wake((*uint32)(unsafe.Pointer(&f.Uint32)), 1) @@ -45,5 +50,8 @@ func (f *Futex) WakeAll() { //export tinygo_futex_wait func tinygo_futex_wait(addr *uint32, cmp uint32) +//export tinygo_futex_wait_timeout +func tinygo_futex_wait_timeout(addr *uint32, cmp uint32, timeout uint64) + //export tinygo_futex_wake func tinygo_futex_wake(addr *uint32, num uint32) diff --git a/src/internal/task/linux.go b/src/internal/task/linux.go new file mode 100644 index 0000000000..7d28f708c4 --- /dev/null +++ b/src/internal/task/linux.go @@ -0,0 +1,9 @@ +//go:build linux && !baremetal + +package task + +import "unsafe" + +// Musl uses a pointer (or unsigned long for C++) so unsafe.Pointer should be +// fine. +type threadID unsafe.Pointer diff --git a/src/internal/task/semaphore.go b/src/internal/task/semaphore.go new file mode 100644 index 0000000000..914f09bc5e --- /dev/null +++ b/src/internal/task/semaphore.go @@ -0,0 +1,32 @@ +package task + +// Barebones semaphore implementation. +// The main limitation is that if there are multiple waiters, a single Post() +// call won't do anything. Only when Post() has been called to awaken all +// waiters will the waiters proceed. +// This limitation is not a problem when there will only be a single waiter. +type Semaphore struct { + futex Futex +} + +// Post (unlock) the semaphore, incrementing the value in the semaphore. +func (s *Semaphore) Post() { + newValue := s.futex.Add(1) + if newValue == 0 { + s.futex.WakeAll() + } +} + +// Wait (lock) the semaphore, decrementing the value in the semaphore. +func (s *Semaphore) Wait() { + delta := int32(-1) + value := s.futex.Add(uint32(delta)) + for { + if int32(value) >= 0 { + // Semaphore unlocked! + return + } + s.futex.Wait(value) + value = s.futex.Load() + } +} diff --git a/src/internal/task/task_threads.c b/src/internal/task/task_threads.c new file mode 100644 index 0000000000..38c69e501d --- /dev/null +++ b/src/internal/task/task_threads.c @@ -0,0 +1,109 @@ +//go:build none + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +// BDWGC also uses SIGRTMIN+6 on Linux, which seems like a reasonable choice. +#ifdef __linux__ +#define taskPauseSignal (SIGRTMIN + 6) +#endif + +// Pointer to the current task.Task structure. +// Ideally the entire task.Task structure would be a thread-local variable but +// this also works. +static __thread void *current_task; + +struct state_pass { + void *(*start)(void*); + void *args; + void *task; + sem_t startlock; +}; + +// Handle the GC pause in Go. +void tinygo_task_gc_pause(int sig); + +// Initialize the main thread. +void tinygo_task_init(void *mainTask, pthread_t *thread, void *context) { + // Make sure the current task pointer is set correctly for the main + // goroutine as well. + current_task = mainTask; + + // Store the thread ID of the main thread. + *thread = pthread_self(); + + // Register the "GC pause" signal for the entire process. + // Using pthread_kill, we can still send the signal to a specific thread. + struct sigaction act = { 0 }; + act.sa_flags = SA_SIGINFO; + act.sa_handler = &tinygo_task_gc_pause; + sigaction(taskPauseSignal, &act, NULL); +} + +void tinygo_task_exited(void*); + +// Helper to start a goroutine while also storing the 'task' structure. +static void* start_wrapper(void *arg) { + struct state_pass *state = arg; + void *(*start)(void*) = state->start; + void *args = state->args; + current_task = state->task; + + // Notify the caller that the thread has successfully started and + // initialized. + sem_post(&state->startlock); + + // Run the goroutine function. + start(args); + + // Notify the Go side this thread will exit. + tinygo_task_exited(current_task); + + return NULL; +}; + +// Start a new goroutine in an OS thread. +int tinygo_task_start(uintptr_t fn, void *args, void *task, pthread_t *thread, uint64_t id, void *context) { + // Sanity check. Should get optimized away. + if (sizeof(pthread_t) != sizeof(void*)) { + __builtin_trap(); + } + + struct state_pass state = { + .start = (void*)fn, + .args = args, + .task = task, + }; + sem_init(&state.startlock, 0, 0); + int result = pthread_create(thread, NULL, &start_wrapper, &state); + + // Wait until the thread has been crated and read all state_pass variables. + sem_wait(&state.startlock); + + return result; +} + +// Return the current task (for task.Current()). +void* tinygo_task_current(void) { + return current_task; +} + +// Obtain the highest address of the stack. +uintptr_t tinygo_task_stacktop(void) { + pthread_attr_t attr; + pthread_getattr_np(pthread_self(), &attr); + void *stackbase; + size_t stacksize; + pthread_attr_getstack(&attr, &stackbase, &stacksize); + pthread_attr_destroy(&attr); + return (uintptr_t)stackbase + (uintptr_t)stacksize; +} + +// Send a signal to cause the task to pause for the GC mark phase. +void tinygo_task_send_gc_signal(pthread_t thread) { + pthread_kill(thread, taskPauseSignal); +} diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go new file mode 100644 index 0000000000..8ecc2090fb --- /dev/null +++ b/src/internal/task/task_threads.go @@ -0,0 +1,260 @@ +//go:build scheduler.threads + +package task + +import ( + "sync/atomic" + "unsafe" +) + +// If true, print verbose debug logs. +const verbose = false + +// Scheduler-specific state. +type state struct { + // Goroutine ID. The number here is not really significant and after a while + // it could wrap around. But it is useful for debugging. + id uint64 + + // Thread ID, pthread_t or similar (typically implemented as a pointer). + thread threadID + + // Next task in the activeTasks queue. + QueueNext *Task + + // Semaphore to pause/resume the thread atomically. + pauseSem Semaphore + + // Semaphore used for stack scanning. + // We can't reuse pauseSem here since the thread might have been paused for + // other reasons (for example, because it was waiting on a channel). + gcSem Semaphore +} + +// Goroutine counter, starting at 0 for the main goroutine. +var goroutineID uint64 + +var mainTask Task + +// Queue of tasks (see QueueNext) that currently exist in the program. +var activeTasks = &mainTask +var activeTaskLock PMutex + +func OnSystemStack() bool { + runtimePanic("todo: task.OnSystemStack") + return false +} + +// Initialize the main goroutine state. Must be called by the runtime on +// startup, before starting any other goroutines. +func Init() { + tinygo_task_init(&mainTask, &mainTask.state.thread) +} + +// Return the task struct for the current thread. +func Current() *Task { + t := (*Task)(tinygo_task_current()) + if t == nil { + runtimePanic("unknown current task") + } + return t +} + +// Pause pauses the current task, until it is resumed by another task. +// It is possible that another task has called Resume() on the task before it +// hits Pause(), in which case the task won't be paused but continues +// immediately. +func Pause() { + // Wait until resumed + t := Current() + if verbose { + println("*** pause: ", t.state.id) + } + t.state.pauseSem.Wait() +} + +// Resume the given task. +// It is legal to resume a task before it gets paused, it means that the next +// call to Pause() won't pause but will continue immediately. This happens in +// practice sometimes in channel operations, where the Resume() might get called +// between the channel unlock and the call to Pause(). +func (t *Task) Resume() { + if verbose { + println("*** resume: ", t.state.id) + } + // Increment the semaphore counter. + // If the task is currently paused in Wait(), it will resume. + // If the task is not yet paused, the next call to Wait() will continue + // immediately. + t.state.pauseSem.Post() +} + +// Start a new OS thread. +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + t.state.id = atomic.AddUint64(&goroutineID, 1) + if verbose { + println("*** start: ", t.state.id, "from", Current().state.id) + } + + // Start the new thread, and add it to the list of threads. + // Do this with a lock so that only started threads are part of the queue + // and the stop-the-world GC won't see threads that haven't started yet or + // are not fully started yet. + activeTaskLock.Lock() + errCode := tinygo_task_start(fn, args, t, &t.state.thread, t.state.id) + if errCode != 0 { + runtimePanic("could not start thread") + } + t.state.QueueNext = activeTasks + activeTasks = t + activeTaskLock.Unlock() +} + +//export tinygo_task_exited +func taskExited(t *Task) { + if verbose { + println("*** exit:", t.state.id) + } + + // Remove from the queue. + // TODO: this can be made more efficient by using a doubly linked list. + activeTaskLock.Lock() + found := false + for q := &activeTasks; *q != nil; q = &(*q).state.QueueNext { + if *q == t { + *q = t.state.QueueNext + found = true + break + } + } + activeTaskLock.Unlock() + + // Sanity check. + if !found { + runtimePanic("taskExited failed") + } +} + +// Futex to wait on until all tasks have finished scanning the stack. +// This is basically a sync.WaitGroup. +var scanDoneFutex Futex + +// GC scan phase. Because we need to stop the world while scanning, this kinda +// needs to be done in the tasks package. +func GCScan() { + current := Current() + + // Don't allow new goroutines to be started while pausing/resuming threads + // in the stop-the-world phase. + activeTaskLock.Lock() + + // Pause all other threads. + numOtherThreads := uint32(0) + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + numOtherThreads++ + tinygo_task_send_gc_signal(t.state.thread) + } + } + + // Store the number of threads to wait for in the futex. + // This is the equivalent of doing an initial wg.Add(numOtherThreads). + scanDoneFutex.Store(numOtherThreads) + + // Scan the current stack, and all current registers. + scanCurrentStack() + + // Wake each paused thread for the first time so it will scan the stack. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Wait until all threads have finished scanning their stack. + // This is the equivalent of wg.Wait() + for { + val := scanDoneFutex.Load() + if val == 0 { + break + } + scanDoneFutex.Wait(val) + } + + // Scan all globals (implemented in the runtime). + gcScanGlobals() + + // Wake each paused thread for the second time, so they will resume normal + // operation. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Allow goroutines to start and exit again. + activeTaskLock.Unlock() +} + +// Scan globals, implemented in the runtime package. +func gcScanGlobals() + +var stackScanLock PMutex + +//export tinygo_task_gc_pause +func tingyo_task_gc_pause() { + // Wait until we get the signal to start scanning the stack. + Current().state.gcSem.Wait() + + // Scan the thread stack. + // Only scan a single thread stack at a time, because the GC marking phase + // doesn't support parallelism. + // TODO: it may be possible to call markRoots directly (without saving + // registers) since we are in a signal handler that already saved a bunch of + // registers. This is an optimization left for a future time. + stackScanLock.Lock() + scanCurrentStack() + stackScanLock.Unlock() + + // Equivalent of wg.Done(): subtract one from the futex and if the result is + // 0 (meaning we were the last in the waitgroup), wake the waiting thread. + n := uint32(1) + if scanDoneFutex.Add(-n) == 0 { + scanDoneFutex.Wake() + } + + // Wait until we get the signal we can resume normally (after the mark phase + // has finished). + Current().state.gcSem.Wait() +} + +//go:export tinygo_scanCurrentStack +func scanCurrentStack() + +// Return the highest address of the current stack. +// +//export tinygo_task_stacktop +func StackTop() uintptr + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) + +// Using //go:linkname instead of //export so that we don't tell the compiler +// that the 't' parameter won't escape (because it will). +// +//go:linkname tinygo_task_init tinygo_task_init +func tinygo_task_init(t *Task, thread *threadID) + +// Here same as for tinygo_task_init. +// +//go:linkname tinygo_task_start tinygo_task_start +func tinygo_task_start(fn uintptr, args unsafe.Pointer, t *Task, thread *threadID, id uint64) int32 + +// Pause the thread by sending it a signal. +// +//export tinygo_task_send_gc_signal +func tinygo_task_send_gc_signal(threadID) + +//export tinygo_task_current +func tinygo_task_current() unsafe.Pointer diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 1f0d7ced8d..885ba48cd4 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package runtime // This file implements the 'chan' type and send/receive/select operations. diff --git a/src/runtime/chan2.go b/src/runtime/chan2.go new file mode 100644 index 0000000000..f2180ddb0c --- /dev/null +++ b/src/runtime/chan2.go @@ -0,0 +1,513 @@ +//go:build scheduler.threads + +package runtime + +// This implementation of channels supports multiple threads running at the same +// time. +// Every channel has a list of senders and a list of receivers, and possibly a +// queue. There is no 'channel state', the state is inferred from the available +// senders/receivers and values in the buffer. +// +// - A sender will first try to send the value to a waiting receiver if there is +// one, but only if there is nothing in the queue (to keep the values flowing +// in the correct order). If it can't, it will add the value in the queue and +// possibly wait as a sender if there's no space available. +// - A receiver will first try to read a value from the queue, but if there is +// none it will try to read from a sender in the list. It will block if it +// can't proceed. +// +// State is kept in various ways: +// +// - The sender value is stored in the sender 'channelBlockedList', which is +// really a queue entry. This works for both senders and select operations: a +// select operation has a separate value to send for each case. +// - The receiver value is stored inside Task.Ptr. This works for receivers, and +// importantly also works for select which has a single buffer for every +// receive operation. +// - The `Task.Data` value stores how the channel operation proceeded. For +// normal send/receive operations, it starts at chanWaiting and then is +// changed to chanOperationOk or chanOperationClosed depending on whether the +// send/receive proceeded normally or because it was closed. For a select +// operation, it also stores the 'case' index in the upper bits (zero for +// non-select operations) so that the select operation knows which case did +// proceed. +// The value is at the same time also a way that goroutines can be the first +// (and only) goroutine to 'take' a channel operation using an atomic CAS +// operation to change it from 'waiting' to any other value. This is important +// for the select statement because multiple goroutines could try to let +// different channels in the select statement proceed at the same time. By +// using Task.Data, only a single channel operation in the select statement +// can proceed. +// - It is possible for the channel queues to contain already-processed senders +// or receivers. This can happen when the select statement managed to proceed +// but the goroutine doing the select has not yet cleaned up the stale queue +// entries before returning. This should therefore only happen for a short +// period. + +import ( + "sync/atomic" + "unsafe" + + "internal/task" +) + +type channel struct { + closed bool + selectLocked bool + elementSize uintptr + bufCap uintptr // 'cap' + bufLen uintptr // 'len' + bufHead uintptr + bufTail uintptr + senders chanQueue + receivers chanQueue + lock task.PMutex + buf unsafe.Pointer +} + +const ( + chanWaiting = iota // waiting for a send/receive operation to continue + chanOperationOk // successfully sent or received (not closed) + chanOperationClosed // channel was closed, the value has been zeroed + chanOperationMask = 0b11 +) + +type chanQueue struct { + first *channelBlockedList +} + +// Pus the next channel operation to the queue. All appropriate fields must have +// been initialized already. +// This function must be called with the channel lock held. +func (q *chanQueue) push(node *channelBlockedList) { + node.next = q.first + q.first = node +} + +// Pop the next waiting channel from the queue. Channels that are no longer +// waiting (for example, when they're part of a select operation) will be +// skipped. +// This function must be called with the channel lock held. +func (q *chanQueue) pop(chanOp uint64) *channelBlockedList { + for { + if q.first == nil { + return nil + } + + // Pop next from the queue. + popped := q.first + q.first = q.first.next + + // The new value for the 'data' field will be a combination of the + // channel operation and the select index. (The select index is 0 for + // non-select channel operations). + newDataValue := chanOp | uint64(popped.index<<2) + + // Try to be the first to proceed with this goroutine. + swapped := atomic.CompareAndSwapUint64(&popped.task.Data, 0, newDataValue) + if swapped { + return popped + } + } +} + +// Remove the given to-be-removed node from the queue if it is part of the +// queue. If there are multiple, only one will be removed. +func (q *chanQueue) remove(remove *channelBlockedList) { + n := &q.first + for *n != nil { + if *n == remove { + *n = (*n).next + return + } + n = &((*n).next) + } +} + +type channelBlockedList struct { + next *channelBlockedList + task *task.Task + index uintptr // select index, 0 for non-select operation + value unsafe.Pointer // if this is a sender, this is the value to send +} + +type chanSelectState struct { + ch *channel + value unsafe.Pointer +} + +func chanMake(elementSize uintptr, bufSize uintptr) *channel { + return &channel{ + elementSize: elementSize, + bufCap: bufSize, + buf: alloc(elementSize*bufSize, nil), + } +} + +// Return the number of entries in this chan, called from the len builtin. +// A nil chan is defined as having length 0. +func chanLen(c *channel) int { + if c == nil { + return 0 + } + return int(c.bufLen) +} + +// Return the capacity of this chan, called from the cap builtin. +// A nil chan is defined as having capacity 0. +func chanCap(c *channel) int { + if c == nil { + return 0 + } + return int(c.bufCap) +} + +// Push the value to the channel buffer array, for a send operation. +// This function may only be called when the channel is locked and it is known +// there is space available in the buffer. +func (ch *channel) bufferPush(value unsafe.Pointer) { + elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize) + ch.bufLen++ + ch.bufHead++ + if ch.bufHead == ch.bufCap { + ch.bufHead = 0 + } + + memcpy(elemAddr, value, ch.elementSize) +} + +// Pop a value from the channel buffer and store it in the 'value' pointer, for +// a receive operation. +// This function may only be called when the channel is locked and it is known +// there is at least one value available in the buffer. +func (ch *channel) bufferPop(value unsafe.Pointer) { + elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize) + ch.bufLen-- + ch.bufTail++ + if ch.bufTail == ch.bufCap { + ch.bufTail = 0 + } + + memcpy(value, elemAddr, ch.elementSize) + + // Zero the value to allow the GC to collect it. + memzero(elemAddr, ch.elementSize) +} + +// Try to proceed with this send operation without blocking, and return whether +// the send succeeded. The lock must be held when calling this function. +func (ch *channel) trySend(value unsafe.Pointer) bool { + // To make sure we send values in the correct order, we can only send + // directly to a receiver when there are no values in the buffer. + + // Do not allow sending on a closed channel. + if ch.closed { + ch.lock.Unlock() + runtimePanic("send on closed channel") + } + + // There is no value in the buffer and we have a receiver available. Copy + // the value directly into the receiver. + if ch.bufLen == 0 { + if receiver := ch.receivers.pop(chanOperationOk); receiver != nil { + memcpy(receiver.task.Ptr, value, ch.elementSize) + receiver.task.Resume() + return true + } + } + + // If there is space in the buffer (if this is a buffered channel), we can + // store the value in the buffer and continue. + if ch.bufLen < ch.bufCap { + ch.bufferPush(value) + return true + } + return false +} + +func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) { + if ch == nil { + // A nil channel blocks forever. Do not schedule this goroutine again. + deadlock() + } + + ch.lock.Lock() + + // See whether we can proceed immediately, and if so, return early. + if ch.trySend(value) { + ch.lock.Unlock() + return + } + + // Can't proceed. Add us to the list of senders and wait until we're awoken. + t := task.Current() + t.Data = chanWaiting + blockedlist.task = t + blockedlist.index = 0 + blockedlist.value = value + ch.senders.push(blockedlist) + ch.lock.Unlock() + + // Wait until this goroutine is resumed. + // It might be resumed after Unlock() and before Pause(). In that case, + // because we use semaphores, the Pause() will continue immediately. + task.Pause() + + // Check whether the sent happened normally (not because the channel was + // closed while sending). + if t.Data == chanOperationClosed { + // Oops, this channel was closed while sending! + runtimePanic("send on closed channel") + } +} + +// Try to proceed with this receive operation without blocking, and return +// whether the receive operation succeeded. The lock must be held when calling +// this function. +func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) { + // To make sure we keep the values in the channel in the correct order, we + // first have to read values from the buffer before we can look at the + // senders. + + // If there is a value available in the buffer, we can pull it out and + // proceed immediately. + if ch.bufLen > 0 { + ch.bufferPop(value) + + // Check for the next sender available and push it to the buffer. + if sender := ch.senders.pop(chanOperationOk); sender != nil { + ch.bufferPush(sender.value) + sender.task.Resume() + } + + return true, true + } + + if ch.closed { + // Channel is closed, so proceed immediately. + memzero(value, ch.elementSize) + return true, false + } + + // If there is a sender, we can proceed with the channel operation + // immediately. + if sender := ch.senders.pop(chanOperationOk); sender != nil { + memcpy(value, sender.value, ch.elementSize) + sender.task.Resume() + return true, true + } + + return false, false +} + +func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) bool { + if ch == nil { + // A nil channel blocks forever. Do not schedule this goroutine again. + deadlock() + } + + ch.lock.Lock() + + if received, ok := ch.tryRecv(value); received { + ch.lock.Unlock() + return ok + } + + // We can't proceed, so we add ourselfs to the list of receivers and wait + // until we're awoken. + t := task.Current() + t.Ptr = value + t.Data = chanWaiting + blockedlist.task = t + blockedlist.index = 0 + ch.receivers.push(blockedlist) + ch.lock.Unlock() + + // Wait until the goroutine is resumed. + task.Pause() + + // Return whether the receive happened from a closed channel. + return t.Data != chanOperationClosed +} + +// chanClose closes the given channel. If this channel has a receiver or is +// empty, it closes the channel. Else, it panics. +func chanClose(ch *channel) { + if ch == nil { + // Not allowed by the language spec. + runtimePanic("close of nil channel") + } + + ch.lock.Lock() + + if ch.closed { + // Not allowed by the language spec. + ch.lock.Unlock() + runtimePanic("close of closed channel") + } + + // Proceed all receiving operations that are blocked. + for { + receiver := ch.receivers.pop(chanOperationClosed) + if receiver == nil { + // Processed all receivers. + break + } + + // Zero the value that the receiver is getting. + memzero(receiver.task.Ptr, ch.elementSize) + + // Wake up the receiving goroutine. + receiver.task.Resume() + } + + // Let all senders panic. + for { + sender := ch.senders.pop(chanOperationClosed) + if sender == nil { + break // processed all senders + } + + // Wake up the sender. + sender.task.Resume() + } + + ch.closed = true + + ch.lock.Unlock() +} + +// We currently use a global select lock to avoid deadlocks while locking each +// individual channel in the select. Without this global lock, two select +// operations that have a different order of the same channels could end up in a +// deadlock. This global lock is inefficient, but gets the job done. +// +// If this becomes a performance issue, we can see how the Go runtime does this. +// I think it does this by sorting all states by channel address and then +// locking them in that order to avoid this deadlock. +var chanSelectLock task.PMutex + +// Lock all channels (taking care to skip duplicate channels). +func lockAllStates(states []chanSelectState) { + for _, state := range states { + if state.ch != nil && !state.ch.selectLocked { + state.ch.lock.Lock() + state.ch.selectLocked = true + } + } +} + +// Unlock all channels (taking care to skip duplicate channels). +func unlockAllStates(states []chanSelectState) { + for _, state := range states { + if state.ch != nil && state.ch.selectLocked { + state.ch.lock.Unlock() + state.ch.selectLocked = false + } + } +} + +func doChanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) { + // Lock everything. + chanSelectLock.Lock() + lockAllStates(states) + + const selectNoIndex = ^uintptr(0) + selectIndex := selectNoIndex + selectOk := true + + // Iterate over each state, and see if it can proceed. + // TODO: start from a random index. + for i, state := range states { + if state.ch == nil { + // A nil channel blocks forever, so it won't take part of the select + // operation. + continue + } + + if state.value == nil { // chan receive + if received, ok := state.ch.tryRecv(recvbuf); received { + selectIndex = uintptr(i) + selectOk = ok + break + } + } else { // chan send + if state.ch.trySend(state.value) { + selectIndex = uintptr(i) + break + } + } + } + + // If this select can immediately proceed, or is a non-blocking select, + // return early. + blocking := len(ops) != 0 + if selectIndex != selectNoIndex || !blocking { + unlockAllStates(states) + chanSelectLock.Unlock() + return selectIndex, selectOk + } + + // The select is blocking and no channel operation can proceed, so things + // become more complicated. + // We add ourselves as a sender/receiver to every channel, and wait for the + // first one to complete. Only one will successfully complete, because + // senders and receivers use a compare-and-exchange atomic operation on + // t.Data so that only one will be able to "take" this select operation. + t := task.Current() + t.Ptr = recvbuf + t.Data = chanWaiting + for i, state := range states { + if state.ch == nil { + continue + } + op := &ops[i] + op.task = t + op.index = uintptr(i) + if state.value == nil { // chan receive + state.ch.receivers.push(op) + } else { // chan send + op.value = state.value + state.ch.senders.push(op) + } + } + + // Now we wait until one of the send/receive operations can proceed. + unlockAllStates(states) + chanSelectLock.Unlock() + task.Pause() + + // Resumed, so one channel operation must have progressed. + + // Make sure all channel ops are removed from the senders/receivers + // queue before we return and the memory of them becomes invalid. + chanSelectLock.Lock() + lockAllStates(states) + for i, state := range states { + if state.ch == nil { + continue + } + op := &ops[i] + if state.value == nil { + state.ch.receivers.remove(op) + } else { + state.ch.senders.remove(op) + } + } + unlockAllStates(states) + chanSelectLock.Unlock() + + // Pull the return values out of t.Data (which contains two bitfields). + selectIndex = uintptr(t.Data) >> 2 + selectOk = t.Data&chanOperationMask != chanOperationClosed + + return selectIndex, selectOk +} + +func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) { + return doChanSelect(recvbuf, states, ops) +} + +func tryChanSelect(recvbuf unsafe.Pointer, states []chanSelectState) (uintptr, bool) { + return doChanSelect(recvbuf, states, nil) +} diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index d55522a9f6..bdc3154fa5 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -1,4 +1,4 @@ -//go:build (gc.conservative || gc.precise) && !tinygo.wasm +//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads package runtime diff --git a/src/runtime/gc_stack_threads.go b/src/runtime/gc_stack_threads.go new file mode 100644 index 0000000000..9c77fa0c7b --- /dev/null +++ b/src/runtime/gc_stack_threads.go @@ -0,0 +1,25 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +func gcMarkReachable() { + task.GCScan() +} + +// Scan globals inside the stop-the-world phase. Called from the STW +// implementation in the internal/task package. +// +//go:linkname gcScanGlobals internal/task.gcScanGlobals +func gcScanGlobals() { + findGlobals(markRoots) +} + +// Function called from assembly with all registers pushed, to actually scan the +// stack. +// +//go:export tinygo_scanstack +func scanstack(sp uintptr) { + markRoots(sp, task.StackTop()) +} diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go new file mode 100644 index 0000000000..713410d874 --- /dev/null +++ b/src/runtime/scheduler_threads.go @@ -0,0 +1,121 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +const hasScheduler = false // not using the cooperative scheduler + +var ( + timerQueueLock task.PMutex + timerQueueStarted bool + timerFutex task.Futex +) + +// Because we just use OS threads, we don't need to do anything special here. We +// can just initialize everything and run main.main on the main thread. +func run() { + initHeap() + task.Init() + initAll() + callMain() +} + +// Pause the current task for a given time. +// +//go:linkname sleep time.Sleep +func sleep(duration int64) { + if duration <= 0 { + return + } + + sleepTicks(nanosecondsToTicks(duration)) +} + +func deadlock() { + // TODO: exit the thread via pthread_exit. + task.Pause() +} + +func scheduleTask(t *task.Task) { + t.Resume() +} + +func Gosched() { + // Each goroutine runs in a thread, so there's not much we can do here. + // There is sched_yield but it's only really intended for realtime + // operation, so is probably best not to use. +} + +// Separate goroutine (thread) that runs timer callbacks when they expire. +func timerRunner() { + for { + timerQueueLock.Lock() + + if timerQueue == nil { + // No timer in the queue, so wait until one becomes available. + val := timerFutex.Load() + timerQueueLock.Unlock() + timerFutex.Wait(val) + continue + } + + now := ticks() + if now < timerQueue.whenTicks() { + // There is a timer in the queue, but we need to wait until it + // expires. + // Using a futex, so that the wait is exited early when adding a new + // (sooner-to-expire) timer. + val := timerFutex.Load() + timerQueueLock.Unlock() + timeout := ticksToNanoseconds(timerQueue.whenTicks() - now) + timerFutex.WaitUntil(val, uint64(timeout)) + continue + } + + // Pop timer from queue. + tn := timerQueue + timerQueue = tn.next + tn.next = nil + + timerQueueLock.Unlock() + + // Run the callback stored in this timer node. + delay := ticksToNanoseconds(now - tn.whenTicks()) + tn.callback(tn, delay) + } +} + +func addTimer(tim *timerNode) { + timerQueueLock.Lock() + + if !timerQueueStarted { + timerQueueStarted = true + go timerRunner() + } + + timerQueueAdd(tim) + + timerFutex.Add(1) + timerFutex.Wake() + + timerQueueLock.Unlock() +} + +func removeTimer(tim *timer) bool { + timerQueueLock.Lock() + removed := timerQueueRemove(tim) + timerQueueLock.Unlock() + return removed +} + +func schedulerRunQueue() *task.Queue { + // This function is not actually used, it is only called when hasScheduler + // is true. So we can just return nil here. + return nil +} + +func runqueueForGC() *task.Queue { + // There is only a runqueue when using the cooperative scheduler. + return nil +}