diff --git a/GNUmakefile b/GNUmakefile index fd2d282c4b..dfca934327 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -934,6 +934,7 @@ endif @cp -rp lib/musl/src/malloc build/release/tinygo/lib/musl/src @cp -rp lib/musl/src/mman build/release/tinygo/lib/musl/src @cp -rp lib/musl/src/math build/release/tinygo/lib/musl/src + @cp -rp lib/musl/src/misc build/release/tinygo/lib/musl/src @cp -rp lib/musl/src/multibyte build/release/tinygo/lib/musl/src @cp -rp lib/musl/src/signal build/release/tinygo/lib/musl/src @cp -rp lib/musl/src/stdio build/release/tinygo/lib/musl/src diff --git a/builder/musl.go b/builder/musl.go index ecae118e47..9dc44f144b 100644 --- a/builder/musl.go +++ b/builder/musl.go @@ -127,6 +127,7 @@ var libMusl = Library{ "malloc/mallocng/*.c", "mman/*.c", "math/*.c", + "misc/*.c", "multibyte/*.c", "signal/" + arch + "/*.s", "signal/*.c", diff --git a/compileopts/options.go b/compileopts/options.go index b83f6f63ba..e879a84862 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -10,7 +10,7 @@ import ( var ( validBuildModeOptions = []string{"default", "c-shared"} validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} validSerialOptions = []string{"none", "uart", "usb", "rtt"} validPrintSizeOptions = []string{"none", "short", "full"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/compileopts/target.go b/compileopts/target.go index 96732e2337..8b53946b32 100644 --- a/compileopts/target.go +++ b/compileopts/target.go @@ -414,6 +414,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { } spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/task/futex_linux.c", + "src/internal/task/task_threads.c", "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "windows": diff --git a/src/internal/task/linux.go b/src/internal/task/linux.go new file mode 100644 index 0000000000..7d28f708c4 --- /dev/null +++ b/src/internal/task/linux.go @@ -0,0 +1,9 @@ +//go:build linux && !baremetal + +package task + +import "unsafe" + +// Musl uses a pointer (or unsigned long for C++) so unsafe.Pointer should be +// fine. +type threadID unsafe.Pointer diff --git a/src/internal/task/semaphore.go b/src/internal/task/semaphore.go new file mode 100644 index 0000000000..914f09bc5e --- /dev/null +++ b/src/internal/task/semaphore.go @@ -0,0 +1,32 @@ +package task + +// Barebones semaphore implementation. +// The main limitation is that if there are multiple waiters, a single Post() +// call won't do anything. Only when Post() has been called to awaken all +// waiters will the waiters proceed. +// This limitation is not a problem when there will only be a single waiter. +type Semaphore struct { + futex Futex +} + +// Post (unlock) the semaphore, incrementing the value in the semaphore. +func (s *Semaphore) Post() { + newValue := s.futex.Add(1) + if newValue == 0 { + s.futex.WakeAll() + } +} + +// Wait (lock) the semaphore, decrementing the value in the semaphore. +func (s *Semaphore) Wait() { + delta := int32(-1) + value := s.futex.Add(uint32(delta)) + for { + if int32(value) >= 0 { + // Semaphore unlocked! + return + } + s.futex.Wait(value) + value = s.futex.Load() + } +} diff --git a/src/internal/task/task_threads.c b/src/internal/task/task_threads.c new file mode 100644 index 0000000000..05dd086d8e --- /dev/null +++ b/src/internal/task/task_threads.c @@ -0,0 +1,106 @@ +//go:build none + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +// BDWGC also uses SIGRTMIN+6 on Linux, which seems like a reasonable choice. +#ifdef __linux__ +#define taskPauseSignal (SIGRTMIN + 6) +#endif + +// Pointer to the current task.Task structure. +// Ideally the entire task.Task structure would be a thread-local variable but +// this also works. +static __thread void *current_task; + +struct state_pass { + void *(*start)(void*); + void *args; + void *task; + sem_t startlock; +}; + +// Handle the GC pause in Go. +void tinygo_task_gc_pause(int sig); + +// Initialize threads from the C side. +void tinygo_task_init(void *mainTask, void *context) { + // Make sure the current task pointer is set correctly for the main + // goroutine as well. + current_task = mainTask; + + // Register the "GC pause" signal for the entire process. + // Using pthread_kill, we can still send the signal to a specific thread. + struct sigaction act = { 0 }; + act.sa_flags = SA_SIGINFO; + act.sa_handler = &tinygo_task_gc_pause; + sigaction(taskPauseSignal, &act, NULL); +} + +void tinygo_task_exited(void*); + +// Helper to start a goroutine while also storing the 'task' structure. +static void* start_wrapper(void *arg) { + struct state_pass *state = arg; + void *(*start)(void*) = state->start; + void *args = state->args; + current_task = state->task; + + // Notify the caller that the thread has successfully started and + // initialized. + sem_post(&state->startlock); + + // Run the goroutine function. + start(args); + + // Notify the Go side this thread will exit. + tinygo_task_exited(current_task); + + return NULL; +}; + +// Start a new goroutine in an OS thread. +int tinygo_task_start(uintptr_t fn, void *args, void *task, pthread_t *thread, uint64_t id, void *context) { + // Sanity check. Should get optimized away. + if (sizeof(pthread_t) != sizeof(void*)) { + __builtin_trap(); + } + + struct state_pass state = { + .start = (void*)fn, + .args = args, + .task = task, + }; + sem_init(&state.startlock, 0, 0); + int result = pthread_create(thread, NULL, &start_wrapper, &state); + + // Wait until the thread has been crated and read all state_pass variables. + sem_wait(&state.startlock); + + return result; +} + +// Return the current task (for task.Current()). +void* tinygo_task_current(void) { + return current_task; +} + +// Obtain the highest address of the stack. +uintptr_t tinygo_task_stacktop(void) { + pthread_attr_t attr; + pthread_getattr_np(pthread_self(), &attr); + void *stackbase; + size_t stacksize; + pthread_attr_getstack(&attr, &stackbase, &stacksize); + pthread_attr_destroy(&attr); + return (uintptr_t)stackbase + (uintptr_t)stacksize; +} + +// Send a signal to cause the task to pause for the GC mark phase. +void tinygo_task_send_gc_signal(pthread_t thread) { + pthread_kill(thread, taskPauseSignal); +} diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go new file mode 100644 index 0000000000..c38864012d --- /dev/null +++ b/src/internal/task/task_threads.go @@ -0,0 +1,228 @@ +//go:build scheduler.threads + +package task + +import ( + "sync/atomic" + "unsafe" +) + +// If true, print verbose debug logs. +const verbose = false + +// Scheduler-specific state. +type state struct { + // Goroutine ID. The number here is not really significant and after a while + // it could wrap around. But it is useful for debugging. + id uint64 + + // Thread ID, pthread_t or similar (typically implemented as a pointer). + thread threadID + + // Next task in the activeTasks queue. + QueueNext *Task + + // Semaphore to pause/resume the thread atomically. + sem Semaphore +} + +// Goroutine counter, starting at 0 for the main goroutine. +var goroutineID uint64 + +var mainTask Task + +// Queue of tasks (see QueueNext) that currently exist in the program. +var activeTasks = &mainTask +var activeTaskLock PMutex + +func OnSystemStack() bool { + runtimePanic("todo: task.OnSystemStack") + return false +} + +// Initialize the main goroutine state. Must be called by the runtime on +// startup, before starting any other goroutines. +func Init() { + tinygo_task_init(&mainTask) +} + +// Return the task struct for the current thread. +func Current() *Task { + t := (*Task)(tinygo_task_current()) + if t == nil { + runtimePanic("unknown current task") + } + return t +} + +// Pause pauses the current task, until it is resumed by another task. +// It is possible that another task has called Resume() on the task before it +// hits Pause(), in which case the task won't be paused but continues +// immediately. +func Pause() { + // Wait until resumed + t := Current() + if verbose { + println("*** pause: ", t.state.id) + } + t.state.sem.Wait() +} + +// Resume the given task. +// It is legal to resume a task before it gets paused, it means that the next +// call to Pause() won't pause but will continue immediately. This happens in +// practice sometimes in channel operations, where the Resume() might get called +// between the channel unlock and the call to Pause(). +func (t *Task) Resume() { + if verbose { + println("*** resume: ", t.state.id) + } + // Increment the semaphore counter. + // If the task is currently paused in Wait(), it will resume. + // If the task is not yet paused, the next call to Wait() will continue + // immediately. + t.state.sem.Post() +} + +// Start a new OS thread. +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + t.state.id = atomic.AddUint64(&goroutineID, 1) + if verbose { + println("*** start: ", t.state.id, "from", Current().state.id) + } + + // Start the new thread, and add it to the list of threads. + // Do this with a lock so that only started threads are part of the queue + // and the stop-the-world GC won't see threads that haven't started yet or + // are not fully started yet. + activeTaskLock.Lock() + errCode := tinygo_task_start(fn, args, t, &t.state.thread, t.state.id) + if errCode != 0 { + runtimePanic("could not start thread") + } + t.state.QueueNext = activeTasks + activeTasks = t + activeTaskLock.Unlock() +} + +//export tinygo_task_exited +func taskExited(t *Task) { + if verbose { + println("*** exit:", t.state.id) + } + + // Remove from the queue. + // TODO: this can be made more efficient by using a doubly linked list. + activeTaskLock.Lock() + found := false + for q := &activeTasks; *q != nil; q = &(*q).state.QueueNext { + if *q == t { + *q = t.state.QueueNext + found = true + break + } + } + activeTaskLock.Unlock() + + // Sanity check. + if !found { + runtimePanic("taskExited failed") + } +} + +// GC scan phase. Because we need to stop the world while scanning, this kinda +// needs to be done in the tasks package. +func GCScan() { + current := Current() + + // Don't allow new goroutines to be started while pausing/resuming threads + // in the stop-the-world phase. + activeTaskLock.Lock() + + // Pause all other threads. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + tinygo_task_send_gc_signal(t.state.thread) + } + } + + // Scan the current stack, and all current registers. + scanCurrentStack() + + // Wake each paused thread for the first time so it will scan the stack. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.sem.Post() + } + } + + // Scan all globals (implemented in the runtime). + gcScanGlobals() + + // Wake each paused thread for the second time, so they will resume normal + // operation. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.sem.Post() + } + } + + // Allow goroutines to start and exit again. + activeTaskLock.Unlock() +} + +// Scan globals, implemented in the runtime package. +func gcScanGlobals() + +var stackScanLock PMutex + +//export tinygo_task_gc_pause +func tingyo_task_gc_pause() { + // Wait until we get the signal to start scanning the stack. + Current().state.sem.Wait() + + // Scan the thread stack. + // Only scan a single thread stack at a time, because the GC marking phase + // doesn't support parallelism. + // TODO: it may be possible to call markRoots directly (without saving + // registers) since we are in a signal handler that already saved a bunch of + // registers. This is an optimization left for a future time. + stackScanLock.Lock() + scanCurrentStack() + stackScanLock.Unlock() + + // Wait until we get the signal we can resume normally (after the mark phase + // has finished). + Current().state.sem.Wait() +} + +//go:export tinygo_scanCurrentStack +func scanCurrentStack() + +// Return the highest address of the current stack. +// +//export tinygo_task_stacktop +func StackTop() uintptr + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) + +// Using //go:linkname instead of //export so that we don't tell the compiler +// that the 't' parameter won't escape (because it will). +// +//go:linkname tinygo_task_init tinygo_task_init +func tinygo_task_init(t *Task) + +// Here same as for tinygo_task_init. +// +//go:linkname tinygo_task_start tinygo_task_start +func tinygo_task_start(fn uintptr, args unsafe.Pointer, t *Task, thread *threadID, id uint64) int32 + +// Pause the thread by sending it a signal. +// +//export tinygo_task_send_gc_signal +func tinygo_task_send_gc_signal(threadID) + +//export tinygo_task_current +func tinygo_task_current() unsafe.Pointer diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 1f0d7ced8d..885ba48cd4 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package runtime // This file implements the 'chan' type and send/receive/select operations. diff --git a/src/runtime/chan2.go b/src/runtime/chan2.go new file mode 100644 index 0000000000..f2180ddb0c --- /dev/null +++ b/src/runtime/chan2.go @@ -0,0 +1,513 @@ +//go:build scheduler.threads + +package runtime + +// This implementation of channels supports multiple threads running at the same +// time. +// Every channel has a list of senders and a list of receivers, and possibly a +// queue. There is no 'channel state', the state is inferred from the available +// senders/receivers and values in the buffer. +// +// - A sender will first try to send the value to a waiting receiver if there is +// one, but only if there is nothing in the queue (to keep the values flowing +// in the correct order). If it can't, it will add the value in the queue and +// possibly wait as a sender if there's no space available. +// - A receiver will first try to read a value from the queue, but if there is +// none it will try to read from a sender in the list. It will block if it +// can't proceed. +// +// State is kept in various ways: +// +// - The sender value is stored in the sender 'channelBlockedList', which is +// really a queue entry. This works for both senders and select operations: a +// select operation has a separate value to send for each case. +// - The receiver value is stored inside Task.Ptr. This works for receivers, and +// importantly also works for select which has a single buffer for every +// receive operation. +// - The `Task.Data` value stores how the channel operation proceeded. For +// normal send/receive operations, it starts at chanWaiting and then is +// changed to chanOperationOk or chanOperationClosed depending on whether the +// send/receive proceeded normally or because it was closed. For a select +// operation, it also stores the 'case' index in the upper bits (zero for +// non-select operations) so that the select operation knows which case did +// proceed. +// The value is at the same time also a way that goroutines can be the first +// (and only) goroutine to 'take' a channel operation using an atomic CAS +// operation to change it from 'waiting' to any other value. This is important +// for the select statement because multiple goroutines could try to let +// different channels in the select statement proceed at the same time. By +// using Task.Data, only a single channel operation in the select statement +// can proceed. +// - It is possible for the channel queues to contain already-processed senders +// or receivers. This can happen when the select statement managed to proceed +// but the goroutine doing the select has not yet cleaned up the stale queue +// entries before returning. This should therefore only happen for a short +// period. + +import ( + "sync/atomic" + "unsafe" + + "internal/task" +) + +type channel struct { + closed bool + selectLocked bool + elementSize uintptr + bufCap uintptr // 'cap' + bufLen uintptr // 'len' + bufHead uintptr + bufTail uintptr + senders chanQueue + receivers chanQueue + lock task.PMutex + buf unsafe.Pointer +} + +const ( + chanWaiting = iota // waiting for a send/receive operation to continue + chanOperationOk // successfully sent or received (not closed) + chanOperationClosed // channel was closed, the value has been zeroed + chanOperationMask = 0b11 +) + +type chanQueue struct { + first *channelBlockedList +} + +// Pus the next channel operation to the queue. All appropriate fields must have +// been initialized already. +// This function must be called with the channel lock held. +func (q *chanQueue) push(node *channelBlockedList) { + node.next = q.first + q.first = node +} + +// Pop the next waiting channel from the queue. Channels that are no longer +// waiting (for example, when they're part of a select operation) will be +// skipped. +// This function must be called with the channel lock held. +func (q *chanQueue) pop(chanOp uint64) *channelBlockedList { + for { + if q.first == nil { + return nil + } + + // Pop next from the queue. + popped := q.first + q.first = q.first.next + + // The new value for the 'data' field will be a combination of the + // channel operation and the select index. (The select index is 0 for + // non-select channel operations). + newDataValue := chanOp | uint64(popped.index<<2) + + // Try to be the first to proceed with this goroutine. + swapped := atomic.CompareAndSwapUint64(&popped.task.Data, 0, newDataValue) + if swapped { + return popped + } + } +} + +// Remove the given to-be-removed node from the queue if it is part of the +// queue. If there are multiple, only one will be removed. +func (q *chanQueue) remove(remove *channelBlockedList) { + n := &q.first + for *n != nil { + if *n == remove { + *n = (*n).next + return + } + n = &((*n).next) + } +} + +type channelBlockedList struct { + next *channelBlockedList + task *task.Task + index uintptr // select index, 0 for non-select operation + value unsafe.Pointer // if this is a sender, this is the value to send +} + +type chanSelectState struct { + ch *channel + value unsafe.Pointer +} + +func chanMake(elementSize uintptr, bufSize uintptr) *channel { + return &channel{ + elementSize: elementSize, + bufCap: bufSize, + buf: alloc(elementSize*bufSize, nil), + } +} + +// Return the number of entries in this chan, called from the len builtin. +// A nil chan is defined as having length 0. +func chanLen(c *channel) int { + if c == nil { + return 0 + } + return int(c.bufLen) +} + +// Return the capacity of this chan, called from the cap builtin. +// A nil chan is defined as having capacity 0. +func chanCap(c *channel) int { + if c == nil { + return 0 + } + return int(c.bufCap) +} + +// Push the value to the channel buffer array, for a send operation. +// This function may only be called when the channel is locked and it is known +// there is space available in the buffer. +func (ch *channel) bufferPush(value unsafe.Pointer) { + elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize) + ch.bufLen++ + ch.bufHead++ + if ch.bufHead == ch.bufCap { + ch.bufHead = 0 + } + + memcpy(elemAddr, value, ch.elementSize) +} + +// Pop a value from the channel buffer and store it in the 'value' pointer, for +// a receive operation. +// This function may only be called when the channel is locked and it is known +// there is at least one value available in the buffer. +func (ch *channel) bufferPop(value unsafe.Pointer) { + elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize) + ch.bufLen-- + ch.bufTail++ + if ch.bufTail == ch.bufCap { + ch.bufTail = 0 + } + + memcpy(value, elemAddr, ch.elementSize) + + // Zero the value to allow the GC to collect it. + memzero(elemAddr, ch.elementSize) +} + +// Try to proceed with this send operation without blocking, and return whether +// the send succeeded. The lock must be held when calling this function. +func (ch *channel) trySend(value unsafe.Pointer) bool { + // To make sure we send values in the correct order, we can only send + // directly to a receiver when there are no values in the buffer. + + // Do not allow sending on a closed channel. + if ch.closed { + ch.lock.Unlock() + runtimePanic("send on closed channel") + } + + // There is no value in the buffer and we have a receiver available. Copy + // the value directly into the receiver. + if ch.bufLen == 0 { + if receiver := ch.receivers.pop(chanOperationOk); receiver != nil { + memcpy(receiver.task.Ptr, value, ch.elementSize) + receiver.task.Resume() + return true + } + } + + // If there is space in the buffer (if this is a buffered channel), we can + // store the value in the buffer and continue. + if ch.bufLen < ch.bufCap { + ch.bufferPush(value) + return true + } + return false +} + +func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) { + if ch == nil { + // A nil channel blocks forever. Do not schedule this goroutine again. + deadlock() + } + + ch.lock.Lock() + + // See whether we can proceed immediately, and if so, return early. + if ch.trySend(value) { + ch.lock.Unlock() + return + } + + // Can't proceed. Add us to the list of senders and wait until we're awoken. + t := task.Current() + t.Data = chanWaiting + blockedlist.task = t + blockedlist.index = 0 + blockedlist.value = value + ch.senders.push(blockedlist) + ch.lock.Unlock() + + // Wait until this goroutine is resumed. + // It might be resumed after Unlock() and before Pause(). In that case, + // because we use semaphores, the Pause() will continue immediately. + task.Pause() + + // Check whether the sent happened normally (not because the channel was + // closed while sending). + if t.Data == chanOperationClosed { + // Oops, this channel was closed while sending! + runtimePanic("send on closed channel") + } +} + +// Try to proceed with this receive operation without blocking, and return +// whether the receive operation succeeded. The lock must be held when calling +// this function. +func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) { + // To make sure we keep the values in the channel in the correct order, we + // first have to read values from the buffer before we can look at the + // senders. + + // If there is a value available in the buffer, we can pull it out and + // proceed immediately. + if ch.bufLen > 0 { + ch.bufferPop(value) + + // Check for the next sender available and push it to the buffer. + if sender := ch.senders.pop(chanOperationOk); sender != nil { + ch.bufferPush(sender.value) + sender.task.Resume() + } + + return true, true + } + + if ch.closed { + // Channel is closed, so proceed immediately. + memzero(value, ch.elementSize) + return true, false + } + + // If there is a sender, we can proceed with the channel operation + // immediately. + if sender := ch.senders.pop(chanOperationOk); sender != nil { + memcpy(value, sender.value, ch.elementSize) + sender.task.Resume() + return true, true + } + + return false, false +} + +func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) bool { + if ch == nil { + // A nil channel blocks forever. Do not schedule this goroutine again. + deadlock() + } + + ch.lock.Lock() + + if received, ok := ch.tryRecv(value); received { + ch.lock.Unlock() + return ok + } + + // We can't proceed, so we add ourselfs to the list of receivers and wait + // until we're awoken. + t := task.Current() + t.Ptr = value + t.Data = chanWaiting + blockedlist.task = t + blockedlist.index = 0 + ch.receivers.push(blockedlist) + ch.lock.Unlock() + + // Wait until the goroutine is resumed. + task.Pause() + + // Return whether the receive happened from a closed channel. + return t.Data != chanOperationClosed +} + +// chanClose closes the given channel. If this channel has a receiver or is +// empty, it closes the channel. Else, it panics. +func chanClose(ch *channel) { + if ch == nil { + // Not allowed by the language spec. + runtimePanic("close of nil channel") + } + + ch.lock.Lock() + + if ch.closed { + // Not allowed by the language spec. + ch.lock.Unlock() + runtimePanic("close of closed channel") + } + + // Proceed all receiving operations that are blocked. + for { + receiver := ch.receivers.pop(chanOperationClosed) + if receiver == nil { + // Processed all receivers. + break + } + + // Zero the value that the receiver is getting. + memzero(receiver.task.Ptr, ch.elementSize) + + // Wake up the receiving goroutine. + receiver.task.Resume() + } + + // Let all senders panic. + for { + sender := ch.senders.pop(chanOperationClosed) + if sender == nil { + break // processed all senders + } + + // Wake up the sender. + sender.task.Resume() + } + + ch.closed = true + + ch.lock.Unlock() +} + +// We currently use a global select lock to avoid deadlocks while locking each +// individual channel in the select. Without this global lock, two select +// operations that have a different order of the same channels could end up in a +// deadlock. This global lock is inefficient, but gets the job done. +// +// If this becomes a performance issue, we can see how the Go runtime does this. +// I think it does this by sorting all states by channel address and then +// locking them in that order to avoid this deadlock. +var chanSelectLock task.PMutex + +// Lock all channels (taking care to skip duplicate channels). +func lockAllStates(states []chanSelectState) { + for _, state := range states { + if state.ch != nil && !state.ch.selectLocked { + state.ch.lock.Lock() + state.ch.selectLocked = true + } + } +} + +// Unlock all channels (taking care to skip duplicate channels). +func unlockAllStates(states []chanSelectState) { + for _, state := range states { + if state.ch != nil && state.ch.selectLocked { + state.ch.lock.Unlock() + state.ch.selectLocked = false + } + } +} + +func doChanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) { + // Lock everything. + chanSelectLock.Lock() + lockAllStates(states) + + const selectNoIndex = ^uintptr(0) + selectIndex := selectNoIndex + selectOk := true + + // Iterate over each state, and see if it can proceed. + // TODO: start from a random index. + for i, state := range states { + if state.ch == nil { + // A nil channel blocks forever, so it won't take part of the select + // operation. + continue + } + + if state.value == nil { // chan receive + if received, ok := state.ch.tryRecv(recvbuf); received { + selectIndex = uintptr(i) + selectOk = ok + break + } + } else { // chan send + if state.ch.trySend(state.value) { + selectIndex = uintptr(i) + break + } + } + } + + // If this select can immediately proceed, or is a non-blocking select, + // return early. + blocking := len(ops) != 0 + if selectIndex != selectNoIndex || !blocking { + unlockAllStates(states) + chanSelectLock.Unlock() + return selectIndex, selectOk + } + + // The select is blocking and no channel operation can proceed, so things + // become more complicated. + // We add ourselves as a sender/receiver to every channel, and wait for the + // first one to complete. Only one will successfully complete, because + // senders and receivers use a compare-and-exchange atomic operation on + // t.Data so that only one will be able to "take" this select operation. + t := task.Current() + t.Ptr = recvbuf + t.Data = chanWaiting + for i, state := range states { + if state.ch == nil { + continue + } + op := &ops[i] + op.task = t + op.index = uintptr(i) + if state.value == nil { // chan receive + state.ch.receivers.push(op) + } else { // chan send + op.value = state.value + state.ch.senders.push(op) + } + } + + // Now we wait until one of the send/receive operations can proceed. + unlockAllStates(states) + chanSelectLock.Unlock() + task.Pause() + + // Resumed, so one channel operation must have progressed. + + // Make sure all channel ops are removed from the senders/receivers + // queue before we return and the memory of them becomes invalid. + chanSelectLock.Lock() + lockAllStates(states) + for i, state := range states { + if state.ch == nil { + continue + } + op := &ops[i] + if state.value == nil { + state.ch.receivers.remove(op) + } else { + state.ch.senders.remove(op) + } + } + unlockAllStates(states) + chanSelectLock.Unlock() + + // Pull the return values out of t.Data (which contains two bitfields). + selectIndex = uintptr(t.Data) >> 2 + selectOk = t.Data&chanOperationMask != chanOperationClosed + + return selectIndex, selectOk +} + +func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) { + return doChanSelect(recvbuf, states, ops) +} + +func tryChanSelect(recvbuf unsafe.Pointer, states []chanSelectState) (uintptr, bool) { + return doChanSelect(recvbuf, states, nil) +} diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index d55522a9f6..bdc3154fa5 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -1,4 +1,4 @@ -//go:build (gc.conservative || gc.precise) && !tinygo.wasm +//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads package runtime diff --git a/src/runtime/gc_stack_threads.go b/src/runtime/gc_stack_threads.go new file mode 100644 index 0000000000..9c77fa0c7b --- /dev/null +++ b/src/runtime/gc_stack_threads.go @@ -0,0 +1,25 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +func gcMarkReachable() { + task.GCScan() +} + +// Scan globals inside the stop-the-world phase. Called from the STW +// implementation in the internal/task package. +// +//go:linkname gcScanGlobals internal/task.gcScanGlobals +func gcScanGlobals() { + findGlobals(markRoots) +} + +// Function called from assembly with all registers pushed, to actually scan the +// stack. +// +//go:export tinygo_scanstack +func scanstack(sp uintptr) { + markRoots(sp, task.StackTop()) +} diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go new file mode 100644 index 0000000000..5ecdd1e507 --- /dev/null +++ b/src/runtime/scheduler_threads.go @@ -0,0 +1,64 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +const hasScheduler = false // not using the cooperative scheduler + +// Because we just use OS threads, we don't need to do anything special here. We +// can just initialize everything and run main.main on the main thread. +func run() { + initHeap() + task.Init() + initAll() + callMain() +} + +// Pause the current task for a given time. +// +//go:linkname sleep time.Sleep +func sleep(duration int64) { + if duration <= 0 { + return + } + + sleepTicks(nanosecondsToTicks(duration)) +} + +func deadlock() { + // TODO: exit the thread via pthread_exit. + task.Pause() +} + +func scheduleTask(t *task.Task) { + t.Resume() +} + +func Gosched() { + // Each goroutine runs in a thread, so there's not much we can do here. + // There is sched_yield but it's only really intended for realtime + // operation, so is probably best not to use. +} + +func addTimer(tim *timerNode) { + // TODO: I think we can implement this by having a single goroutine (thread) + // process all timers. + runtimePanic("todo: addTimer") +} + +func removeTimer(tim *timer) bool { + runtimePanic("todo: removeTimer") + return false +} + +func schedulerRunQueue() *task.Queue { + // This function is not actually used, it is only called when hasScheduler + // is true. So we can just return nil here. + return nil +} + +func runqueueForGC() *task.Queue { + // There is only a runqueue when using the cooperative scheduler. + return nil +}