diff --git a/inc/core/DeviceFiber.h b/inc/core/DeviceFiber.h index 9f036ff..322e7f9 100644 --- a/inc/core/DeviceFiber.h +++ b/inc/core/DeviceFiber.h @@ -55,6 +55,9 @@ DEALINGS IN THE SOFTWARE. #define DEVICE_SCHEDULER_EVT_TICK 1 #define DEVICE_SCHEDULER_EVT_IDLE 2 +#define DEVICE_GROUP_ID_SYSTEM 0 +#define DEVICE_GROUP_ID_USER 1 + /** * Representation of a single Fiber */ @@ -64,7 +67,8 @@ struct Fiber PROCESSOR_WORD_TYPE stack_bottom; // The start address of this Fiber's stack. The stack is heap allocated, and full descending. PROCESSOR_WORD_TYPE stack_top; // The end address of this Fiber's stack. uint32_t context; // Context specific information. - uint32_t flags; // Information about this fiber. + uint16_t group_id; // See fiber_pause_group() below. + uint16_t flags; // Information about this fiber. Fiber **queue; // The queue this fiber is stored on. Fiber *next, *prev; // Position of this Fiber on the run queue. }; @@ -96,6 +100,24 @@ int fiber_scheduler_running(); void release_fiber(void); void release_fiber(void *param); + +/** + * Set the group of the current fiber. All fibers start in group 0. + * + * If you set group to one that is currently paused, the call will block, until resumed. + */ +void fiber_set_group(uint16_t group_id); + +/** + * Pause all fibers with the specific group id. They will not be scheduled until resumed. + */ +int fiber_pause_group(uint16_t group_id); + +/** + * Resume all fibers with the specific group id. + */ +int fiber_resume_group(uint16_t group_id); + /** * Launches a fiber. * diff --git a/source/core/DeviceFiber.cpp b/source/core/DeviceFiber.cpp index 51bc394..f84ef08 100644 --- a/source/core/DeviceFiber.cpp +++ b/source/core/DeviceFiber.cpp @@ -39,6 +39,14 @@ DEALINGS IN THE SOFTWARE. //Serial serial(USBTX, USBRX); +struct PausedGroup { + uint64_t timestamp; + uint16_t group_id; + Fiber *runQueue; + Fiber *sleepQueue; + PausedGroup *next; +}; + /* * Statically allocated values used to create and destroy Fibers. * required to be defined here to allow persistence during context switches. @@ -55,6 +63,8 @@ static Fiber *sleepQueue = NULL; // The list of blocked fibers static Fiber *waitQueue = NULL; // The list of blocked fibers waiting on an event. static Fiber *fiberPool = NULL; // Pool of unused fibers, just waiting for a job to do. +static PausedGroup *pausedGroups = NULL; + /* * Scheduler wide flags */ @@ -109,6 +119,35 @@ void queue_fiber(Fiber *f, Fiber **queue) __enable_irq(); } +static PausedGroup *find_paused_group(uint16_t group_id) +{ + for (PausedGroup *g = pausedGroups; g; g = g->next) { + if (g->group_id == group_id) { + return g; + } + } + return NULL; +} + +static void move_fiber(Fiber *f, Fiber **queue) +{ + dequeue_fiber(f); + queue_fiber(f, queue); +} + +static void queue_fiber_for_run(Fiber *f) +{ + if (f->group_id != 0) { + PausedGroup *g = find_paused_group(f->group_id); + if (g) { + move_fiber(f, &g->runQueue); + return; + } + } + + move_fiber(f, &runQueue); +} + /** * Utility function to the given fiber from whichever queue it is currently stored on. * @@ -161,18 +200,38 @@ Fiber *getFiberContext() f->stack_bottom = 0; f->stack_top = 0; + f->queue = NULL; } __enable_irq(); // Ensure this fiber is in suitable state for reuse. f->flags = 0; + f->group_id = 0; tcb_configure_stack_base(&f->tcb, fiber_initial_stack_base()); return f; } +static Fiber* handle_fob() +{ + // This is a blocking call, so if we're in a fork on block context, + // it's time to spawn a new fiber... + if (currentFiber->flags & DEVICE_FIBER_FLAG_FOB) + { + // Allocate a TCB from the new fiber. This will come from the tread pool if availiable, + // else a new one will be allocated on the heap. + forkedFiber = getFiberContext(); + + // If we're out of memory, there's nothing we can do. + // keep running in the context of the current thread as a best effort. + if (forkedFiber != NULL) + return forkedFiber; + } + + return currentFiber; +} /** * Initialises the Fiber scheduler. @@ -251,8 +310,7 @@ void scheduler_tick(DeviceEvent evt) if (evt.timestamp >= f->context) { // Wakey wakey! - dequeue_fiber(f); - queue_fiber(f,&runQueue); + move_fiber(f, &runQueue); } f = t; @@ -294,8 +352,7 @@ void scheduler_event(DeviceEvent evt) if (!notifyOneComplete) { // Wakey wakey! - dequeue_fiber(f); - queue_fiber(f,&runQueue); + queue_fiber_for_run(f); notifyOneComplete = 1; } } @@ -304,8 +361,7 @@ void scheduler_event(DeviceEvent evt) else if ((id == DEVICE_ID_ANY || id == evt.source) && (value == DEVICE_EVT_ANY || value == evt.value)) { // Wakey wakey! - dequeue_fiber(f); - queue_fiber(f,&runQueue); + queue_fiber_for_run(f); } f = t; @@ -329,8 +385,6 @@ void scheduler_event(DeviceEvent evt) */ void fiber_sleep(unsigned long t) { - Fiber *f = currentFiber; - // If the scheduler is not running, then simply perform a spin wait and exit. if (!fiber_scheduler_running()) { @@ -338,28 +392,13 @@ void fiber_sleep(unsigned long t) return; } - // Sleep is a blocking call, so if we're in a fork on block context, - // it's time to spawn a new fiber... - if (currentFiber->flags & DEVICE_FIBER_FLAG_FOB) - { - // Allocate a new fiber. This will come from the fiber pool if availiable, - // else a new one will be allocated on the heap. - forkedFiber = getFiberContext(); - - // If we're out of memory, there's nothing we can do. - // keep running in the context of the current thread as a best effort. - if (forkedFiber != NULL) - f = forkedFiber; - } + Fiber *f = handle_fob(); // Calculate and store the time we want to wake up. f->context = system_timer_current_time() + t; - // Remove fiber from the run queue - dequeue_fiber(f); - - // Add fiber to the sleep queue. We maintain strict ordering here to reduce lookup times. - queue_fiber(f, &sleepQueue); + // Move fiber from the run queue to the sleep queue. We maintain strict ordering here to reduce lookup times. + move_fiber(f, &sleepQueue); // Finally, enter the scheduler. schedule(); @@ -414,33 +453,16 @@ int fiber_wait_for_event(uint16_t id, uint16_t value) */ int fiber_wake_on_event(uint16_t id, uint16_t value) { - Fiber *f = currentFiber; - if (messageBus == NULL || !fiber_scheduler_running()) return DEVICE_NOT_SUPPORTED; - - // Sleep is a blocking call, so if we're in a fork on block context, - // it's time to spawn a new fiber... - if (currentFiber->flags & DEVICE_FIBER_FLAG_FOB) - { - // Allocate a TCB from the new fiber. This will come from the tread pool if availiable, - // else a new one will be allocated on the heap. - forkedFiber = getFiberContext(); - - // If we're out of memory, there's nothing we can do. - // keep running in the context of the current thread as a best effort. - if (forkedFiber != NULL) - f = forkedFiber; - } + + Fiber *f = handle_fob(); // Encode the event data in the context field. It's handy having a 32 bit core. :-) f->context = (uint32_t)value << 16 | id; - // Remove ourselves from the run queue - dequeue_fiber(f); - // Add ourselves to the sleep queue. We maintain strict ordering here to reduce lookup times. - queue_fiber(f, &waitQueue); + move_fiber(f, &waitQueue); // Register to receive this event, so we can wake up the fiber when it happens. // Special case for the notify channel, as we always stay registered for that. @@ -707,11 +729,8 @@ void release_fiber(void) if (!fiber_scheduler_running()) return; - // Remove ourselves form the runqueue. - dequeue_fiber(currentFiber); - - // Add ourselves to the list of free fibers - queue_fiber(currentFiber, &fiberPool); + // Move ourselves to the list of free fibers + move_fiber(currentFiber, &fiberPool); // Find something else to do! schedule(); @@ -911,3 +930,112 @@ void idle_task() schedule(); } } + + +/** + * Set the group of the current fiber. All fibers start in group 0. + * + * If you set group to one that is currently paused, the call will block, until resumed. + */ +void fiber_set_group(uint16_t group_id) +{ + PausedGroup *g = find_paused_group(group_id); + + Fiber *f = currentFiber; + + if (g) { + // we're about to block; make sure we're not FOB + f = handle_fob(); + // just in case - search again, in case we were unpaused in the meantime + g = find_paused_group(group_id); + } + + f->group_id = group_id; + if (g) { + move_fiber(f, &g->runQueue); + schedule(); // do not return until we're back on runQueue + } +} + +static void move_fibers(Fiber *queue, Fiber **trgQueue, uint16_t group_id) +{ + Fiber *n; + for (Fiber *f = queue; f; f = n) { + n = f->next; + if (group_id == 0 || f->group_id == group_id) { + move_fiber(f, trgQueue); + } + } +} + +/** + * Pause all fibers with the specific group id. They will not be scheduled until resumed. + */ +int fiber_pause_group(uint16_t group_id) +{ + if (group_id == 0) + return DEVICE_INVALID_PARAMETER; + + PausedGroup *g = find_paused_group(group_id); + if (g) + return DEVICE_NOT_SUPPORTED; // already paused + + Fiber *f = currentFiber; + if (currentFiber->group_id == group_id) + f = handle_fob(); + + g = (PausedGroup*)malloc(sizeof(PausedGroup)); + + g->group_id = group_id; + g->timestamp = system_timer_current_time(); + g->runQueue = NULL; + g->sleepQueue = NULL; + g->next = pausedGroups; + pausedGroups = g; + + move_fibers(runQueue, &g->runQueue, group_id); + move_fibers(sleepQueue, &g->sleepQueue, group_id); + + if (f->group_id == group_id) { + move_fiber(f, &g->runQueue); + schedule(); + } + + return DEVICE_OK; +} + +/** + * Resume all fibers with the specific group id. + */ +int fiber_resume_group(uint16_t group_id) +{ + if (group_id == 0) + return DEVICE_INVALID_PARAMETER; + + PausedGroup *g = find_paused_group(group_id); + if (!g) + return DEVICE_NOT_SUPPORTED; + + uint32_t delta = (uint32_t)(system_timer_current_time() - g->timestamp); + for (Fiber *f = g->sleepQueue; f; f = f->next) { + f->context += delta; + } + + move_fibers(g->runQueue, &runQueue, 0); + move_fibers(g->sleepQueue, &sleepQueue, 0); + + if (g == pausedGroups) { + pausedGroups = g->next; + } else { + for (PausedGroup *pp = pausedGroups; pp; pp = pp->next) { + if (pp->next == g) { + pp->next = g->next; + break; + } + } + } + free(g); + + return DEVICE_OK; +} +