From 0ca0db48b7e741c50959904842ba7ca742483afb Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Thu, 24 Oct 2024 10:26:17 +0200 Subject: [PATCH] WIP map every goroutine to a new OS thread --- compileopts/options.go | 2 +- compileopts/options_test.go | 2 +- compileopts/target.go | 5 +- src/internal/task/linux.go | 9 + src/internal/task/semaphore.go | 32 ++++ src/internal/task/task_threads.c | 104 ++++++++++++ src/internal/task/task_threads.go | 265 ++++++++++++++++++++++++++++++ src/runtime/gc_stack_raw.go | 2 +- src/runtime/gc_stack_threads.go | 25 +++ src/runtime/runtime_unix.go | 1 + src/runtime/scheduler_threads.go | 124 ++++++++++++++ 11 files changed, 567 insertions(+), 4 deletions(-) create mode 100644 src/internal/task/linux.go create mode 100644 src/internal/task/semaphore.go create mode 100644 src/internal/task/task_threads.c create mode 100644 src/internal/task/task_threads.go create mode 100644 src/runtime/gc_stack_threads.go create mode 100644 src/runtime/scheduler_threads.go diff --git a/compileopts/options.go b/compileopts/options.go index bc462b29bd..52f0a64bff 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -10,7 +10,7 @@ import ( var ( validBuildModeOptions = []string{"default", "c-shared"} validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} validSerialOptions = []string{"none", "uart", "usb", "rtt"} validPrintSizeOptions = []string{"none", "short", "full"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/compileopts/options_test.go b/compileopts/options_test.go index 23ffec465f..bd6e4c04ea 100644 --- a/compileopts/options_test.go +++ b/compileopts/options_test.go @@ -10,7 +10,7 @@ import ( func TestVerifyOptions(t *testing.T) { expectedGCError := errors.New(`invalid gc option 'incorrect': valid values are none, leaking, conservative, custom, precise`) - expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify`) + expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify, threads`) expectedPrintSizeError := errors.New(`invalid size option 'incorrect': valid values are none, short, full`) expectedPanicStrategyError := errors.New(`invalid panic option 'incorrect': valid values are print, trap`) diff --git a/compileopts/target.go b/compileopts/target.go index f60ee30972..1e6acdf869 100644 --- a/compileopts/target.go +++ b/compileopts/target.go @@ -247,7 +247,6 @@ func defaultTarget(options *Options) (*TargetSpec, error) { GOARCH: options.GOARCH, BuildTags: []string{options.GOOS, options.GOARCH}, GC: "precise", - Scheduler: "tasks", Linker: "cc", DefaultStackSize: 1024 * 64, // 64kB GDB: []string{"gdb"}, @@ -378,6 +377,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { platformVersion = "11.0.0" // first macosx platform with arm64 support } llvmvendor = "apple" + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "darwin-libSystem" // Use macosx* instead of darwin, otherwise darwin/arm64 will refer to @@ -395,6 +395,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "linux": + spec.Scheduler = "threads" spec.Linker = "ld.lld" spec.RTLib = "compiler-rt" spec.Libc = "musl" @@ -415,9 +416,11 @@ func defaultTarget(options *Options) (*TargetSpec, error) { } spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/futex/futex_linux.c", + "src/internal/task/task_threads.c", "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "windows": + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "mingw-w64" // Note: using a medium code model, low image base and no ASLR diff --git a/src/internal/task/linux.go b/src/internal/task/linux.go new file mode 100644 index 0000000000..7d28f708c4 --- /dev/null +++ b/src/internal/task/linux.go @@ -0,0 +1,9 @@ +//go:build linux && !baremetal + +package task + +import "unsafe" + +// Musl uses a pointer (or unsigned long for C++) so unsafe.Pointer should be +// fine. +type threadID unsafe.Pointer diff --git a/src/internal/task/semaphore.go b/src/internal/task/semaphore.go new file mode 100644 index 0000000000..914f09bc5e --- /dev/null +++ b/src/internal/task/semaphore.go @@ -0,0 +1,32 @@ +package task + +// Barebones semaphore implementation. +// The main limitation is that if there are multiple waiters, a single Post() +// call won't do anything. Only when Post() has been called to awaken all +// waiters will the waiters proceed. +// This limitation is not a problem when there will only be a single waiter. +type Semaphore struct { + futex Futex +} + +// Post (unlock) the semaphore, incrementing the value in the semaphore. +func (s *Semaphore) Post() { + newValue := s.futex.Add(1) + if newValue == 0 { + s.futex.WakeAll() + } +} + +// Wait (lock) the semaphore, decrementing the value in the semaphore. +func (s *Semaphore) Wait() { + delta := int32(-1) + value := s.futex.Add(uint32(delta)) + for { + if int32(value) >= 0 { + // Semaphore unlocked! + return + } + s.futex.Wait(value) + value = s.futex.Load() + } +} diff --git a/src/internal/task/task_threads.c b/src/internal/task/task_threads.c new file mode 100644 index 0000000000..5f93980a2e --- /dev/null +++ b/src/internal/task/task_threads.c @@ -0,0 +1,104 @@ +//go:build none + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +// BDWGC also uses SIGRTMIN+6 on Linux, which seems like a reasonable choice. +#ifdef __linux__ +#define taskPauseSignal (SIGRTMIN + 6) +#endif + +// Pointer to the current task.Task structure. +// Ideally the entire task.Task structure would be a thread-local variable but +// this also works. +static __thread void *current_task; + +struct state_pass { + void *(*start)(void*); + void *args; + void *task; + uintptr_t *stackTop; + sem_t startlock; +}; + +// Handle the GC pause in Go. +void tinygo_task_gc_pause(int sig); + +// Initialize the main thread. +void tinygo_task_init(void *mainTask, pthread_t *thread, void *context) { + // Make sure the current task pointer is set correctly for the main + // goroutine as well. + current_task = mainTask; + + // Store the thread ID of the main thread. + *thread = pthread_self(); + + // Register the "GC pause" signal for the entire process. + // Using pthread_kill, we can still send the signal to a specific thread. + struct sigaction act = { 0 }; + act.sa_flags = SA_SIGINFO; + act.sa_handler = &tinygo_task_gc_pause; + sigaction(taskPauseSignal, &act, NULL); +} + +void tinygo_task_exited(void*); + +// Helper to start a goroutine while also storing the 'task' structure. +static void* start_wrapper(void *arg) { + struct state_pass *state = arg; + void *(*start)(void*) = state->start; + void *args = state->args; + current_task = state->task; + + // Save the current stack pointer in the goroutine state, for the GC. + int stackAddr; + *(state->stackTop) = (uintptr_t)(&stackAddr); + + // Notify the caller that the thread has successfully started and + // initialized. + sem_post(&state->startlock); + + // Run the goroutine function. + start(args); + + // Notify the Go side this thread will exit. + tinygo_task_exited(current_task); + + return NULL; +}; + +// Start a new goroutine in an OS thread. +int tinygo_task_start(uintptr_t fn, void *args, void *task, pthread_t *thread, uintptr_t *stackTop, void *context) { + // Sanity check. Should get optimized away. + if (sizeof(pthread_t) != sizeof(void*)) { + __builtin_trap(); + } + + struct state_pass state = { + .start = (void*)fn, + .args = args, + .task = task, + .stackTop = stackTop, + }; + sem_init(&state.startlock, 0, 0); + int result = pthread_create(thread, NULL, &start_wrapper, &state); + + // Wait until the thread has been crated and read all state_pass variables. + sem_wait(&state.startlock); + + return result; +} + +// Return the current task (for task.Current()). +void* tinygo_task_current(void) { + return current_task; +} + +// Send a signal to cause the task to pause for the GC mark phase. +void tinygo_task_send_gc_signal(pthread_t thread) { + pthread_kill(thread, taskPauseSignal); +} diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go new file mode 100644 index 0000000000..93204fb9ba --- /dev/null +++ b/src/internal/task/task_threads.go @@ -0,0 +1,265 @@ +//go:build scheduler.threads + +package task + +import ( + "sync/atomic" + "unsafe" +) + +// If true, print verbose debug logs. +const verbose = false + +// Scheduler-specific state. +type state struct { + // Goroutine ID. The number here is not really significant and after a while + // it could wrap around. But it is useful for debugging. + id uintptr + + // Thread ID, pthread_t or similar (typically implemented as a pointer). + thread threadID + + // Highest address of the stack. It is stored when the goroutine starts, and + // is needed to be able to scan the stack. + stackTop uintptr + + // Next task in the activeTasks queue. + QueueNext *Task + + // Semaphore to pause/resume the thread atomically. + pauseSem Semaphore + + // Semaphore used for stack scanning. + // We can't reuse pauseSem here since the thread might have been paused for + // other reasons (for example, because it was waiting on a channel). + gcSem Semaphore +} + +// Goroutine counter, starting at 0 for the main goroutine. +var goroutineID uintptr + +var mainTask Task + +// Queue of tasks (see QueueNext) that currently exist in the program. +var activeTasks = &mainTask +var activeTaskLock PMutex + +func OnSystemStack() bool { + runtimePanic("todo: task.OnSystemStack") + return false +} + +// Initialize the main goroutine state. Must be called by the runtime on +// startup, before starting any other goroutines. +func Init(sp uintptr) { + mainTask.state.stackTop = sp + tinygo_task_init(&mainTask, &mainTask.state.thread) +} + +// Return the task struct for the current thread. +func Current() *Task { + t := (*Task)(tinygo_task_current()) + if t == nil { + runtimePanic("unknown current task") + } + return t +} + +// Pause pauses the current task, until it is resumed by another task. +// It is possible that another task has called Resume() on the task before it +// hits Pause(), in which case the task won't be paused but continues +// immediately. +func Pause() { + // Wait until resumed + t := Current() + if verbose { + println("*** pause: ", t.state.id) + } + t.state.pauseSem.Wait() +} + +// Resume the given task. +// It is legal to resume a task before it gets paused, it means that the next +// call to Pause() won't pause but will continue immediately. This happens in +// practice sometimes in channel operations, where the Resume() might get called +// between the channel unlock and the call to Pause(). +func (t *Task) Resume() { + if verbose { + println("*** resume: ", t.state.id) + } + // Increment the semaphore counter. + // If the task is currently paused in Wait(), it will resume. + // If the task is not yet paused, the next call to Wait() will continue + // immediately. + t.state.pauseSem.Post() +} + +// Start a new OS thread. +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + t.state.id = atomic.AddUintptr(&goroutineID, 1) + if verbose { + println("*** start: ", t.state.id, "from", Current().state.id) + } + + // Start the new thread, and add it to the list of threads. + // Do this with a lock so that only started threads are part of the queue + // and the stop-the-world GC won't see threads that haven't started yet or + // are not fully started yet. + activeTaskLock.Lock() + errCode := tinygo_task_start(fn, args, t, &t.state.thread, &t.state.stackTop) + if errCode != 0 { + runtimePanic("could not start thread") + } + t.state.QueueNext = activeTasks + activeTasks = t + activeTaskLock.Unlock() +} + +//export tinygo_task_exited +func taskExited(t *Task) { + if verbose { + println("*** exit:", t.state.id) + } + + // Remove from the queue. + // TODO: this can be made more efficient by using a doubly linked list. + activeTaskLock.Lock() + found := false + for q := &activeTasks; *q != nil; q = &(*q).state.QueueNext { + if *q == t { + *q = t.state.QueueNext + found = true + break + } + } + activeTaskLock.Unlock() + + // Sanity check. + if !found { + runtimePanic("taskExited failed") + } +} + +// Futex to wait on until all tasks have finished scanning the stack. +// This is basically a sync.WaitGroup. +var scanDoneFutex Futex + +// GC scan phase. Because we need to stop the world while scanning, this kinda +// needs to be done in the tasks package. +func GCScan() { + current := Current() + + // Don't allow new goroutines to be started while pausing/resuming threads + // in the stop-the-world phase. + activeTaskLock.Lock() + + // Pause all other threads. + numOtherThreads := uint32(0) + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + numOtherThreads++ + tinygo_task_send_gc_signal(t.state.thread) + } + } + + // Store the number of threads to wait for in the futex. + // This is the equivalent of doing an initial wg.Add(numOtherThreads). + scanDoneFutex.Store(numOtherThreads) + + // Scan the current stack, and all current registers. + scanCurrentStack() + + // Wake each paused thread for the first time so it will scan the stack. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Wait until all threads have finished scanning their stack. + // This is the equivalent of wg.Wait() + for { + val := scanDoneFutex.Load() + if val == 0 { + break + } + scanDoneFutex.Wait(val) + } + + // Scan all globals (implemented in the runtime). + gcScanGlobals() + + // Wake each paused thread for the second time, so they will resume normal + // operation. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Allow goroutines to start and exit again. + activeTaskLock.Unlock() +} + +// Scan globals, implemented in the runtime package. +func gcScanGlobals() + +var stackScanLock PMutex + +//export tinygo_task_gc_pause +func tingyo_task_gc_pause() { + // Wait until we get the signal to start scanning the stack. + Current().state.gcSem.Wait() + + // Scan the thread stack. + // Only scan a single thread stack at a time, because the GC marking phase + // doesn't support parallelism. + // TODO: it may be possible to call markRoots directly (without saving + // registers) since we are in a signal handler that already saved a bunch of + // registers. This is an optimization left for a future time. + stackScanLock.Lock() + scanCurrentStack() + stackScanLock.Unlock() + + // Equivalent of wg.Done(): subtract one from the futex and if the result is + // 0 (meaning we were the last in the waitgroup), wake the waiting thread. + n := uint32(1) + if scanDoneFutex.Add(-n) == 0 { + scanDoneFutex.Wake() + } + + // Wait until we get the signal we can resume normally (after the mark phase + // has finished). + Current().state.gcSem.Wait() +} + +//go:export tinygo_scanCurrentStack +func scanCurrentStack() + +// Return the highest address of the current stack. +func StackTop() uintptr { + return Current().state.stackTop +} + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) + +// Using //go:linkname instead of //export so that we don't tell the compiler +// that the 't' parameter won't escape (because it will). +// +//go:linkname tinygo_task_init tinygo_task_init +func tinygo_task_init(t *Task, thread *threadID) + +// Here same as for tinygo_task_init. +// +//go:linkname tinygo_task_start tinygo_task_start +func tinygo_task_start(fn uintptr, args unsafe.Pointer, t *Task, thread *threadID, stackTop *uintptr) int32 + +// Pause the thread by sending it a signal. +// +//export tinygo_task_send_gc_signal +func tinygo_task_send_gc_signal(threadID) + +//export tinygo_task_current +func tinygo_task_current() unsafe.Pointer diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index d55522a9f6..bdc3154fa5 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -1,4 +1,4 @@ -//go:build (gc.conservative || gc.precise) && !tinygo.wasm +//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads package runtime diff --git a/src/runtime/gc_stack_threads.go b/src/runtime/gc_stack_threads.go new file mode 100644 index 0000000000..9c77fa0c7b --- /dev/null +++ b/src/runtime/gc_stack_threads.go @@ -0,0 +1,25 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +func gcMarkReachable() { + task.GCScan() +} + +// Scan globals inside the stop-the-world phase. Called from the STW +// implementation in the internal/task package. +// +//go:linkname gcScanGlobals internal/task.gcScanGlobals +func gcScanGlobals() { + findGlobals(markRoots) +} + +// Function called from assembly with all registers pushed, to actually scan the +// stack. +// +//go:export tinygo_scanstack +func scanstack(sp uintptr) { + markRoots(sp, task.StackTop()) +} diff --git a/src/runtime/runtime_unix.go b/src/runtime/runtime_unix.go index e6f81778de..bfc4858fc2 100644 --- a/src/runtime/runtime_unix.go +++ b/src/runtime/runtime_unix.go @@ -73,6 +73,7 @@ type timespec struct { tv_nsec int64 // unsigned 64-bit integer on all time64 platforms } +// Highest address of the stack of the main thread. var stackTop uintptr // Entry point for Go. Initialize all packages and call main.main(). diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go new file mode 100644 index 0000000000..e553a5b9c7 --- /dev/null +++ b/src/runtime/scheduler_threads.go @@ -0,0 +1,124 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +const hasScheduler = false // not using the cooperative scheduler + +// We use threads, so yes there is parallelism. +const hasParallelism = true + +var ( + timerQueueLock task.PMutex + timerQueueStarted bool + timerFutex task.Futex +) + +// Because we just use OS threads, we don't need to do anything special here. We +// can just initialize everything and run main.main on the main thread. +func run() { + initHeap() + task.Init(stackTop) + initAll() + callMain() +} + +// Pause the current task for a given time. +// +//go:linkname sleep time.Sleep +func sleep(duration int64) { + if duration <= 0 { + return + } + + sleepTicks(nanosecondsToTicks(duration)) +} + +func deadlock() { + // TODO: exit the thread via pthread_exit. + task.Pause() +} + +func scheduleTask(t *task.Task) { + t.Resume() +} + +func Gosched() { + // Each goroutine runs in a thread, so there's not much we can do here. + // There is sched_yield but it's only really intended for realtime + // operation, so is probably best not to use. +} + +// Separate goroutine (thread) that runs timer callbacks when they expire. +func timerRunner() { + for { + timerQueueLock.Lock() + + if timerQueue == nil { + // No timer in the queue, so wait until one becomes available. + val := timerFutex.Load() + timerQueueLock.Unlock() + timerFutex.Wait(val) + continue + } + + now := ticks() + if now < timerQueue.whenTicks() { + // There is a timer in the queue, but we need to wait until it + // expires. + // Using a futex, so that the wait is exited early when adding a new + // (sooner-to-expire) timer. + val := timerFutex.Load() + timerQueueLock.Unlock() + timeout := ticksToNanoseconds(timerQueue.whenTicks() - now) + timerFutex.WaitUntil(val, uint64(timeout)) + continue + } + + // Pop timer from queue. + tn := timerQueue + timerQueue = tn.next + tn.next = nil + + timerQueueLock.Unlock() + + // Run the callback stored in this timer node. + delay := ticksToNanoseconds(now - tn.whenTicks()) + tn.callback(tn, delay) + } +} + +func addTimer(tim *timerNode) { + timerQueueLock.Lock() + + if !timerQueueStarted { + timerQueueStarted = true + go timerRunner() + } + + timerQueueAdd(tim) + + timerFutex.Add(1) + timerFutex.Wake() + + timerQueueLock.Unlock() +} + +func removeTimer(tim *timer) bool { + timerQueueLock.Lock() + removed := timerQueueRemove(tim) + timerQueueLock.Unlock() + return removed +} + +func schedulerRunQueue() *task.Queue { + // This function is not actually used, it is only called when hasScheduler + // is true. So we can just return nil here. + return nil +} + +func runqueueForGC() *task.Queue { + // There is only a runqueue when using the cooperative scheduler. + return nil +}