diff options
Diffstat (limited to 'src/pulp.c')
-rw-r--r-- | src/pulp.c | 445 |
1 files changed, 445 insertions, 0 deletions
diff --git a/src/pulp.c b/src/pulp.c new file mode 100644 index 0000000..fdb95cd --- /dev/null +++ b/src/pulp.c @@ -0,0 +1,445 @@ +/* + * Copyright (C) 2018 Vito Caputo - <vcaputo@pengaru.com> + * + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 3 as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/* pulp implements a very simple cooperative multi-tasking scheduler + * with zero consideration for IO-wait states. This only concerns itself + * with the switching of lightweight CPU-bound fibers which may either + * exit, sleep for a specified duration, or go idle indefinitely with + * the possibility of being awoken by other fibers via the pulp api. + * + * No consideration for SMP/threads has been made, no locking or atomic + * stuff is in effect here. You can run a pulp scheduler per-thread to + * scale to SMP, but migrating/stealing fibers across the per-thread schedulers + * is not supported at this time. + * + */ + +#include <assert.h> +#include <errno.h> +#include <thunk.h> +#include <time.h> +#include <stdlib.h> +#include <sys/time.h> +#include <ucontext.h> + +#include "list.h" +#include "pulp.h" + +/* for when this needs to support windows: + * https://www.codeproject.com/Tips/4225/Unix-ucontext-t-Operations-on-Windows-Platforms + */ + +#define PULP_USECS_PER_SEC 1000000ULL +#define PULP_USECS_PER_MSEC 1000ULL +#define PULP_FIBER_STACK_SIZE (16 * 1024) +#define PULP_FIBER_ALLOC_NUM 32 + +typedef struct pulp_context_t { + ucontext_t ucontext; + char stack[PULP_FIBER_STACK_SIZE]; +} pulp_context_t; + +typedef struct pulp_fiber_t { + list_head_t fibers; /* node on free/idle/run/sleep list */ + union { + struct { + pulp_usec_t alarm; + } sleep; + } state; + + pulp_context_t context; +} pulp_fiber_t; + +typedef struct pulp_fiber_alloc_t { + list_head_t allocs; + unsigned n_fibers; + pulp_fiber_t fibers[]; +} pulp_fiber_alloc_t; + +typedef struct pulp_t { + pulp_fiber_t *current; /* currently running fiber */ + pulp_usec_t now; /* current time updated @ schedule via expire_alarms() */ + int exited:1; /* set when pulp_exit() triggered switch to caller */ + struct { + list_head_t free; + list_head_t idle; + list_head_t run; + list_head_t sleep; + } fibers; + + list_head_t allocs; /* list of pulp_fiber_alloc_t */ + pulp_context_t schedule_context; + pulp_context_t caller_context; +} pulp_t; + + +static void pulp_schedule(pulp_t *pulp); + + +/* allocate more free fibers */ +static int more_fibers(pulp_t *pulp) +{ + pulp_fiber_alloc_t *alloc; + unsigned i; + + assert(pulp); + + alloc = calloc(1, sizeof(pulp_fiber_alloc_t) + sizeof(pulp_fiber_t) * PULP_FIBER_ALLOC_NUM); + if (!alloc) + return -ENOMEM; + + alloc->n_fibers = PULP_FIBER_ALLOC_NUM; /* TODO: maybe grow this? */ + for (i = 0; i < alloc->n_fibers; i++) { + pulp_fiber_t *fiber = &alloc->fibers[i]; + + list_add_tail(&fiber->fibers, &pulp->fibers.free); + } + + list_add(&alloc->allocs, &pulp->allocs); + + return 0; +} + + +/* put the current fiber on a list @ tail */ +/* note pulp->current is left intact, it's pulp_schedule()'s responsibility + * to replace it. */ +static inline void put_current_fiber(pulp_t *pulp, list_head_t *where) +{ + assert(pulp); + assert(where); + assert(pulp->current); + + list_add(&pulp->current->fibers, where); +} + + +/* set the current fiber from the first entry on the list at head */ +static inline void set_current_fiber(pulp_t *pulp, list_head_t *head) +{ + assert(pulp); + assert(head); + assert(!list_empty(head)); + + pulp->current = list_entry(head->next, pulp_fiber_t, fibers); + list_del(&pulp->current->fibers); +} + + +/* setup a context to run func w/ptr */ +static void setup_context(pulp_t *pulp, pulp_context_t *context, void *func, void *ptr) +{ + assert(pulp); + assert(context); + assert(func); + + getcontext(&context->ucontext); + context->ucontext.uc_stack.ss_sp = context->stack; + context->ucontext.uc_stack.ss_size = sizeof(context->stack); + context->ucontext.uc_link = &pulp->schedule_context.ucontext; + makecontext(&context->ucontext, func, 1, ptr); +} + + +/* sets the supplied context as the executing context */ +static void enter_context(pulp_context_t *context) +{ + assert(context); + + setcontext(&context->ucontext); +} + + +/* swaps the new supplied context with the executing context */ +/* the current context is saved in old */ +static void swap_context(pulp_context_t *old, pulp_context_t *new) +{ + assert(old); + assert(new); + + if (old == new) + return; + + swapcontext(&old->ucontext, &new->ucontext); +} + + +/* handle a return out of a fiber, equivalent to the fiber exiting */ +static void schedule_context(pulp_t *pulp) +{ + assert(pulp); + + if (pulp->current) + put_current_fiber(pulp, &pulp->fibers.free); + + pulp_schedule(pulp); +} + + +/* return time of day in microseconds */ +static pulp_usec_t now(void) +{ + struct timeval ts_now; + pulp_usec_t now; + + gettimeofday(&ts_now, NULL); + now = ts_now.tv_sec * PULP_USECS_PER_SEC; + now += ts_now.tv_usec; + + return now; +} + + +/* create a new pulp scheduler instance */ +pulp_t * pulp_new(void) +{ + pulp_t *pulp; + + pulp = calloc(1, sizeof(pulp_t)); + if (!pulp) + return NULL; + + INIT_LIST_HEAD(&pulp->fibers.free); + INIT_LIST_HEAD(&pulp->fibers.idle); + INIT_LIST_HEAD(&pulp->fibers.run); + INIT_LIST_HEAD(&pulp->fibers.sleep); + INIT_LIST_HEAD(&pulp->allocs); + + if (more_fibers(pulp) < 0) { + free(pulp); + return NULL; + } + + setup_context(pulp, &pulp->schedule_context, schedule_context, pulp); + pulp->now = now(); + + return pulp; +} + + +/* free a pulp scheduler instance */ +void pulp_free(pulp_t *pulp) +{ + pulp_fiber_alloc_t *alloc, *_alloc; + + assert(pulp); + + list_for_each_entry_safe(alloc, _alloc, &pulp->allocs, allocs) + free(alloc); + + free(pulp); +} + + +/* move any expired sleeps from pulp->fibers.sleep to run */ +static void expire_alarms(pulp_t *pulp) +{ + pulp_fiber_t *f, *_f; + + assert(pulp); + + pulp->now = now(); + + /* TODO: use a priority queue */ + list_for_each_entry_safe(f, _f, &pulp->fibers.sleep, fibers) { + if (pulp->now >= f->state.sleep.alarm) + list_move_tail(&f->fibers, &pulp->fibers.run); + } +} + + +/* schedule and switch to the next fiber */ +static void pulp_schedule(pulp_t *pulp) +{ + pulp_context_t *target_context = &pulp->caller_context; + pulp_fiber_t *current; + + assert(pulp); + + expire_alarms(pulp); + + current = pulp->current; + pulp->current = NULL; + + if (!list_empty(&pulp->fibers.run)) { + set_current_fiber(pulp, &pulp->fibers.run); + target_context = &pulp->current->context; + } + + if (current) + swap_context(¤t->context, target_context); + else + enter_context(target_context); +} + + +/* Tick a pulp scheduler - runs fibers until all are idle/sleeping. + * An estimate of how much time may pass before the next tick should occur is stored in next_tick_delay_us. + * If pulp_exit() is called by a fiber, or no more fibers exist, the return value is -1, and next_tick_delay_us is ignored. + * If all fibers are idle, the return value is 0, and next_tick_delay_us is ignored. + * If any fibers are sleeping, the return value is 1, and next_tick_delay_us is useful. + */ +int pulp_tick(pulp_t *pulp, unsigned *next_tick_delay_us) +{ + assert(pulp); + + swap_context(&pulp->caller_context, &pulp->schedule_context); + + if (pulp->exited) + return -1; + + /* every tick drains the run queue currently */ + assert(list_empty(&pulp->fibers.run)); + + if (!list_empty(&pulp->fibers.sleep)) { + /* TODO: get delay from the sleep queue when it's a priority queue */ + *next_tick_delay_us = 333; + + return 1; + } + + if (!list_empty(&pulp->fibers.idle)) + return 0; + + return -1; +} + + +/* run a pulp scheduler until pulp_exit() is called or all fibers return */ +void pulp_run(pulp_t *pulp) +{ + for (;;) { + unsigned delay = 1000000; + + switch (pulp_tick(pulp, &delay)) { + case 0: + /* XXX: everything idle is a terminally wedged state in pulp_run(), + * unless I suppose a signal handler changes a fiber state. + */ + assert(0); + /* or fall-through to a sleep on NDEBUG */ + + case 1: { + struct timespec ts_delay = { 0, delay * 1000 }; + + /* TODO: get the minimum sleep from the priority queue when implemented, for now we spin a bit. */ + nanosleep(&ts_delay, NULL); + break; + } + + case -1: + return; + } + } +} + + +/* exit a pulp scheduler from within a fiber */ +/* this causes pulp_tick() to return immediately. */ +void pulp_exit(pulp_t *pulp) +{ + assert(pulp); + + pulp->exited = 1; + enter_context(&pulp->caller_context); +} + + +/* find a free fiber - this returns the next free fiber but does not + * modify its free state, it remains on the free list at return. + */ +static pulp_fiber_t * find_free_fiber(pulp_t *pulp) +{ + assert(pulp); + + if (list_empty(&pulp->fibers.free) && more_fibers(pulp) < 0) + return NULL; + + return list_entry(pulp->fibers.free.next, pulp_fiber_t, fibers); +} + + +/* run a new fiber on a pulp scheduler after delay_ms milliseconds (0 for immediately). + * returns the created fiber in case the caller wants to + * retain the ability to operate on it. Fibers are automatically + * reclaimed on return so this generally isn't necessary. + */ +pulp_fiber_t * pulp_fiber_new(pulp_t *pulp, unsigned delay_ms, thunk_t *thunk) +{ + pulp_fiber_t *fiber; + + assert(pulp); + assert(thunk); + + fiber = find_free_fiber(pulp); + if (!fiber) + return NULL; + + setup_context(pulp, &fiber->context, thunk->dispatch, thunk); + + if (!delay_ms) { + list_move_tail(&fiber->fibers, &pulp->fibers.run); + } else { + fiber->state.sleep.alarm = pulp->now + delay_ms * PULP_USECS_PER_MSEC; + list_move_tail(&fiber->fibers, &pulp->fibers.sleep); + } + + return fiber; +} + + +/* sleep for the supplied number of microseconds (not public) */ +static void pulp_usleep(pulp_t *pulp, unsigned useconds) +{ + assert(pulp); + + pulp->current->state.sleep.alarm = pulp->now + useconds; + put_current_fiber(pulp, &pulp->fibers.sleep); + pulp_schedule(pulp); +} + + +/* sleep for the supplied number of milliseconds */ +void pulp_msleep(pulp_t *pulp, unsigned milliseconds) +{ + assert(pulp); + + return pulp_usleep(pulp, milliseconds * PULP_USECS_PER_MSEC); +} + + +/* sleep for the supplied number of seconds */ +void pulp_sleep(pulp_t *pulp, unsigned seconds) +{ + assert(pulp); + + return pulp_usleep(pulp, seconds * PULP_USECS_PER_SEC); +} + + +/* return 'now' from the scheduler */ +/* this is a convenience/optimization for fibers which need the current + * time - the scheduler already caches it, so make it available. + */ +pulp_usec_t pulp_now(pulp_t *pulp) +{ + return pulp->now; +} + + +/* TODO: interfaces for idling/waking fibers */ +/* TODO: interfaces for synchronization across fibers */ +/* all these are left to be implemented as their needs arise */ |