-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial sketch of fiber groups #14
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,14 @@ DEALINGS IN THE SOFTWARE. | |
|
||
//Serial serial(USBTX, USBRX); | ||
|
||
struct PausedGroup { | ||
uint64_t timestamp; | ||
uint16_t group_id; | ||
Fiber *runQueue; | ||
Fiber *sleepQueue; | ||
PausedGroup *next; | ||
}; | ||
|
||
/* | ||
* Statically allocated values used to create and destroy Fibers. | ||
* required to be defined here to allow persistence during context switches. | ||
|
@@ -55,6 +63,8 @@ static Fiber *sleepQueue = NULL; // The list of blocked fibers | |
static Fiber *waitQueue = NULL; // The list of blocked fibers waiting on an event. | ||
static Fiber *fiberPool = NULL; // Pool of unused fibers, just waiting for a job to do. | ||
|
||
static PausedGroup *pausedGroups = NULL; | ||
|
||
/* | ||
* Scheduler wide flags | ||
*/ | ||
|
@@ -109,6 +119,35 @@ void queue_fiber(Fiber *f, Fiber **queue) | |
__enable_irq(); | ||
} | ||
|
||
static PausedGroup *find_paused_group(uint16_t group_id) | ||
{ | ||
for (PausedGroup *g = pausedGroups; g; g = g->next) { | ||
if (g->group_id == group_id) { | ||
return g; | ||
} | ||
} | ||
return NULL; | ||
} | ||
|
||
static void move_fiber(Fiber *f, Fiber **queue) | ||
{ | ||
dequeue_fiber(f); | ||
queue_fiber(f, queue); | ||
} | ||
|
||
static void queue_fiber_for_run(Fiber *f) | ||
{ | ||
if (f->group_id != 0) { | ||
PausedGroup *g = find_paused_group(f->group_id); | ||
if (g) { | ||
move_fiber(f, &g->runQueue); | ||
return; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good. Assume this is to cover cases where blocked fibers are woken up, but their fiber group has been paused in the meantime? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's right. This is when fibers wait for events - when paused, this fiber should end up in group run queue. This doesn't happen for sleeping fibers. |
||
|
||
move_fiber(f, &runQueue); | ||
} | ||
|
||
/** | ||
* Utility function to the given fiber from whichever queue it is currently stored on. | ||
* | ||
|
@@ -161,18 +200,38 @@ Fiber *getFiberContext() | |
|
||
f->stack_bottom = 0; | ||
f->stack_top = 0; | ||
f->queue = NULL; | ||
} | ||
|
||
__enable_irq(); | ||
|
||
// Ensure this fiber is in suitable state for reuse. | ||
f->flags = 0; | ||
f->group_id = 0; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we're assuming all fibers are "system" fibers unless otherwise stated. I think this is good - no different to current semantics. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's right |
||
tcb_configure_stack_base(&f->tcb, fiber_initial_stack_base()); | ||
|
||
return f; | ||
} | ||
|
||
static Fiber* handle_fob() | ||
{ | ||
// This is a blocking call, so if we're in a fork on block context, | ||
// it's time to spawn a new fiber... | ||
if (currentFiber->flags & DEVICE_FIBER_FLAG_FOB) | ||
{ | ||
// Allocate a TCB from the new fiber. This will come from the tread pool if availiable, | ||
// else a new one will be allocated on the heap. | ||
forkedFiber = getFiberContext(); | ||
|
||
// If we're out of memory, there's nothing we can do. | ||
// keep running in the context of the current thread as a best effort. | ||
if (forkedFiber != NULL) | ||
return forkedFiber; | ||
} | ||
|
||
return currentFiber; | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is good. Certainly great that you've factored this out into an internal function. Fork-On-Block (FOB) is really an optimization for event handlers. Essentially new fiber contexts are lazily created when needed, which can reduce churn and RAM overhead for the common case (where event handlers are non-blocking functions). This code path will only ever be triggered if you call pause() on a fiber that is a member of the fiber group being paused whilst it's inside an event handler... I think this is ok in principle, as it should look just like an event handler that blocks waiting for anything else, but this would be the test case to validate. I guess for you it would boil down to "can you breakpoint an event handler?" |
||
/** | ||
* Initialises the Fiber scheduler. | ||
|
@@ -251,8 +310,7 @@ void scheduler_tick(DeviceEvent evt) | |
if (evt.timestamp >= f->context) | ||
{ | ||
// Wakey wakey! | ||
dequeue_fiber(f); | ||
queue_fiber(f,&runQueue); | ||
move_fiber(f, &runQueue); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this also be a queue_fiber_for_run() call? In case the sleeping fiber's group has been paused? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sleeping fibers are moved to sleep queue of the paused group upon pause (mainly, so that we can modify their wake up time when unpausing). So this should never be a paused fiber. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cool. |
||
|
||
f = t; | ||
|
@@ -294,8 +352,7 @@ void scheduler_event(DeviceEvent evt) | |
if (!notifyOneComplete) | ||
{ | ||
// Wakey wakey! | ||
dequeue_fiber(f); | ||
queue_fiber(f,&runQueue); | ||
queue_fiber_for_run(f); | ||
notifyOneComplete = 1; | ||
} | ||
} | ||
|
@@ -304,8 +361,7 @@ void scheduler_event(DeviceEvent evt) | |
else if ((id == DEVICE_ID_ANY || id == evt.source) && (value == DEVICE_EVT_ANY || value == evt.value)) | ||
{ | ||
// Wakey wakey! | ||
dequeue_fiber(f); | ||
queue_fiber(f,&runQueue); | ||
queue_fiber_for_run(f); | ||
} | ||
|
||
f = t; | ||
|
@@ -329,37 +385,20 @@ void scheduler_event(DeviceEvent evt) | |
*/ | ||
void fiber_sleep(unsigned long t) | ||
{ | ||
Fiber *f = currentFiber; | ||
|
||
// If the scheduler is not running, then simply perform a spin wait and exit. | ||
if (!fiber_scheduler_running()) | ||
{ | ||
wait_ms(t); | ||
return; | ||
} | ||
|
||
// Sleep is a blocking call, so if we're in a fork on block context, | ||
// it's time to spawn a new fiber... | ||
if (currentFiber->flags & DEVICE_FIBER_FLAG_FOB) | ||
{ | ||
// Allocate a new fiber. This will come from the fiber pool if availiable, | ||
// else a new one will be allocated on the heap. | ||
forkedFiber = getFiberContext(); | ||
|
||
// If we're out of memory, there's nothing we can do. | ||
// keep running in the context of the current thread as a best effort. | ||
if (forkedFiber != NULL) | ||
f = forkedFiber; | ||
} | ||
Fiber *f = handle_fob(); | ||
|
||
// Calculate and store the time we want to wake up. | ||
f->context = system_timer_current_time() + t; | ||
|
||
// Remove fiber from the run queue | ||
dequeue_fiber(f); | ||
|
||
// Add fiber to the sleep queue. We maintain strict ordering here to reduce lookup times. | ||
queue_fiber(f, &sleepQueue); | ||
// Move fiber from the run queue to the sleep queue. We maintain strict ordering here to reduce lookup times. | ||
move_fiber(f, &sleepQueue); | ||
|
||
// Finally, enter the scheduler. | ||
schedule(); | ||
|
@@ -414,33 +453,16 @@ int fiber_wait_for_event(uint16_t id, uint16_t value) | |
*/ | ||
int fiber_wake_on_event(uint16_t id, uint16_t value) | ||
{ | ||
Fiber *f = currentFiber; | ||
|
||
if (messageBus == NULL || !fiber_scheduler_running()) | ||
return DEVICE_NOT_SUPPORTED; | ||
|
||
// Sleep is a blocking call, so if we're in a fork on block context, | ||
// it's time to spawn a new fiber... | ||
if (currentFiber->flags & DEVICE_FIBER_FLAG_FOB) | ||
{ | ||
// Allocate a TCB from the new fiber. This will come from the tread pool if availiable, | ||
// else a new one will be allocated on the heap. | ||
forkedFiber = getFiberContext(); | ||
|
||
// If we're out of memory, there's nothing we can do. | ||
// keep running in the context of the current thread as a best effort. | ||
if (forkedFiber != NULL) | ||
f = forkedFiber; | ||
} | ||
|
||
Fiber *f = handle_fob(); | ||
|
||
// Encode the event data in the context field. It's handy having a 32 bit core. :-) | ||
f->context = (uint32_t)value << 16 | id; | ||
|
||
// Remove ourselves from the run queue | ||
dequeue_fiber(f); | ||
|
||
// Add ourselves to the sleep queue. We maintain strict ordering here to reduce lookup times. | ||
queue_fiber(f, &waitQueue); | ||
move_fiber(f, &waitQueue); | ||
|
||
// Register to receive this event, so we can wake up the fiber when it happens. | ||
// Special case for the notify channel, as we always stay registered for that. | ||
|
@@ -707,11 +729,8 @@ void release_fiber(void) | |
if (!fiber_scheduler_running()) | ||
return; | ||
|
||
// Remove ourselves form the runqueue. | ||
dequeue_fiber(currentFiber); | ||
|
||
// Add ourselves to the list of free fibers | ||
queue_fiber(currentFiber, &fiberPool); | ||
// Move ourselves to the list of free fibers | ||
move_fiber(currentFiber, &fiberPool); | ||
|
||
// Find something else to do! | ||
schedule(); | ||
|
@@ -911,3 +930,112 @@ void idle_task() | |
schedule(); | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Set the group of the current fiber. All fibers start in group 0. | ||
* | ||
* If you set group to one that is currently paused, the call will block, until resumed. | ||
*/ | ||
void fiber_set_group(uint16_t group_id) | ||
{ | ||
PausedGroup *g = find_paused_group(group_id); | ||
|
||
Fiber *f = currentFiber; | ||
|
||
if (g) { | ||
// we're about to block; make sure we're not FOB | ||
f = handle_fob(); | ||
// just in case - search again, in case we were unpaused in the meantime | ||
g = find_paused_group(group_id); | ||
} | ||
|
||
f->group_id = group_id; | ||
if (g) { | ||
move_fiber(f, &g->runQueue); | ||
schedule(); // do not return until we're back on runQueue | ||
} | ||
} | ||
|
||
static void move_fibers(Fiber *queue, Fiber **trgQueue, uint16_t group_id) | ||
{ | ||
Fiber *n; | ||
for (Fiber *f = queue; f; f = n) { | ||
n = f->next; | ||
if (group_id == 0 || f->group_id == group_id) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mmoskal - why the exception case for group_id == 0 here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. zero means to move everything, but maybe I don't strictly need it... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, I see. that's fine. My bad. I misinterpreted this as (f->group_id == 0). I think this is fine - quite consistent with messageBus etc. when zero maps to ANY_EVENT etc. etc. |
||
move_fiber(f, trgQueue); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Pause all fibers with the specific group id. They will not be scheduled until resumed. | ||
*/ | ||
int fiber_pause_group(uint16_t group_id) | ||
{ | ||
if (group_id == 0) | ||
return DEVICE_INVALID_PARAMETER; | ||
|
||
PausedGroup *g = find_paused_group(group_id); | ||
if (g) | ||
return DEVICE_NOT_SUPPORTED; // already paused | ||
|
||
Fiber *f = currentFiber; | ||
if (currentFiber->group_id == group_id) | ||
f = handle_fob(); | ||
|
||
g = (PausedGroup*)malloc(sizeof(PausedGroup)); | ||
|
||
g->group_id = group_id; | ||
g->timestamp = system_timer_current_time(); | ||
g->runQueue = NULL; | ||
g->sleepQueue = NULL; | ||
g->next = pausedGroups; | ||
pausedGroups = g; | ||
|
||
move_fibers(runQueue, &g->runQueue, group_id); | ||
move_fibers(sleepQueue, &g->sleepQueue, group_id); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about the waitQueue? This is where fibers blocked waiting for an event will be... Wonders if we should leave fiber that are already blocked on the sleepQueue and waitQueue and then migrate them across to the paued queue(s) if/when wake up... I think you have code to support this code path in queue_fiber_for_run() ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you just answered this one above... |
||
if (f->group_id == group_id) { | ||
move_fiber(f, &g->runQueue); | ||
schedule(); | ||
} | ||
|
||
return DEVICE_OK; | ||
} | ||
|
||
/** | ||
* Resume all fibers with the specific group id. | ||
*/ | ||
int fiber_resume_group(uint16_t group_id) | ||
{ | ||
if (group_id == 0) | ||
return DEVICE_INVALID_PARAMETER; | ||
|
||
PausedGroup *g = find_paused_group(group_id); | ||
if (!g) | ||
return DEVICE_NOT_SUPPORTED; | ||
|
||
uint32_t delta = (uint32_t)(system_timer_current_time() - g->timestamp); | ||
for (Fiber *f = g->sleepQueue; f; f = f->next) { | ||
f->context += delta; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, ok. So sleep() operations are offset by a pause() operation. Essentially time stands still for paused fibers... how relativistic! I guess the reasoning here is that this is good because it preserves relative time between all the fibers in the group that was paused? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note we agreed this in initial discussions, so all good here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I think this is what user expects. when the program is paused, all user threads are paused and when we resume them the fiber_sleeps() return in the right order. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, agree. |
||
|
||
move_fibers(g->runQueue, &runQueue, 0); | ||
move_fibers(g->sleepQueue, &sleepQueue, 0); | ||
|
||
if (g == pausedGroups) { | ||
pausedGroups = g->next; | ||
} else { | ||
for (PausedGroup *pp = pausedGroups; pp; pp = pp->next) { | ||
if (pp->next == g) { | ||
pp->next = g->next; | ||
break; | ||
} | ||
} | ||
} | ||
free(g); | ||
|
||
return DEVICE_OK; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Can we just add a few doxygen comments in here for @param and @return? Just because our online docs scrape these to form docs like these:
https://lancaster-university.github.io/microbit-docs/
These aren't online for codal yet, but I'd like to do it here too.