summaryrefslogtreecommitdiff
path: root/src/pulp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pulp.c')
-rw-r--r--src/pulp.c445
1 files changed, 445 insertions, 0 deletions
diff --git a/src/pulp.c b/src/pulp.c
new file mode 100644
index 0000000..fdb95cd
--- /dev/null
+++ b/src/pulp.c
@@ -0,0 +1,445 @@
+/*
+ * Copyright (C) 2018 Vito Caputo - <vcaputo@pengaru.com>
+ *
+ * This program is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 3 as published
+ * by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/* pulp implements a very simple cooperative multi-tasking scheduler
+ * with zero consideration for IO-wait states. This only concerns itself
+ * with the switching of lightweight CPU-bound fibers which may either
+ * exit, sleep for a specified duration, or go idle indefinitely with
+ * the possibility of being awoken by other fibers via the pulp api.
+ *
+ * No consideration for SMP/threads has been made, no locking or atomic
+ * stuff is in effect here. You can run a pulp scheduler per-thread to
+ * scale to SMP, but migrating/stealing fibers across the per-thread schedulers
+ * is not supported at this time.
+ *
+ */
+
+#include <assert.h>
+#include <errno.h>
+#include <thunk.h>
+#include <time.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <ucontext.h>
+
+#include "list.h"
+#include "pulp.h"
+
+/* for when this needs to support windows:
+ * https://www.codeproject.com/Tips/4225/Unix-ucontext-t-Operations-on-Windows-Platforms
+ */
+
+#define PULP_USECS_PER_SEC 1000000ULL
+#define PULP_USECS_PER_MSEC 1000ULL
+#define PULP_FIBER_STACK_SIZE (16 * 1024)
+#define PULP_FIBER_ALLOC_NUM 32
+
+typedef struct pulp_context_t {
+ ucontext_t ucontext;
+ char stack[PULP_FIBER_STACK_SIZE];
+} pulp_context_t;
+
+typedef struct pulp_fiber_t {
+ list_head_t fibers; /* node on free/idle/run/sleep list */
+ union {
+ struct {
+ pulp_usec_t alarm;
+ } sleep;
+ } state;
+
+ pulp_context_t context;
+} pulp_fiber_t;
+
+typedef struct pulp_fiber_alloc_t {
+ list_head_t allocs;
+ unsigned n_fibers;
+ pulp_fiber_t fibers[];
+} pulp_fiber_alloc_t;
+
+typedef struct pulp_t {
+ pulp_fiber_t *current; /* currently running fiber */
+ pulp_usec_t now; /* current time updated @ schedule via expire_alarms() */
+ int exited:1; /* set when pulp_exit() triggered switch to caller */
+ struct {
+ list_head_t free;
+ list_head_t idle;
+ list_head_t run;
+ list_head_t sleep;
+ } fibers;
+
+ list_head_t allocs; /* list of pulp_fiber_alloc_t */
+ pulp_context_t schedule_context;
+ pulp_context_t caller_context;
+} pulp_t;
+
+
+static void pulp_schedule(pulp_t *pulp);
+
+
+/* allocate more free fibers */
+static int more_fibers(pulp_t *pulp)
+{
+ pulp_fiber_alloc_t *alloc;
+ unsigned i;
+
+ assert(pulp);
+
+ alloc = calloc(1, sizeof(pulp_fiber_alloc_t) + sizeof(pulp_fiber_t) * PULP_FIBER_ALLOC_NUM);
+ if (!alloc)
+ return -ENOMEM;
+
+ alloc->n_fibers = PULP_FIBER_ALLOC_NUM; /* TODO: maybe grow this? */
+ for (i = 0; i < alloc->n_fibers; i++) {
+ pulp_fiber_t *fiber = &alloc->fibers[i];
+
+ list_add_tail(&fiber->fibers, &pulp->fibers.free);
+ }
+
+ list_add(&alloc->allocs, &pulp->allocs);
+
+ return 0;
+}
+
+
+/* put the current fiber on a list @ tail */
+/* note pulp->current is left intact, it's pulp_schedule()'s responsibility
+ * to replace it. */
+static inline void put_current_fiber(pulp_t *pulp, list_head_t *where)
+{
+ assert(pulp);
+ assert(where);
+ assert(pulp->current);
+
+ list_add(&pulp->current->fibers, where);
+}
+
+
+/* set the current fiber from the first entry on the list at head */
+static inline void set_current_fiber(pulp_t *pulp, list_head_t *head)
+{
+ assert(pulp);
+ assert(head);
+ assert(!list_empty(head));
+
+ pulp->current = list_entry(head->next, pulp_fiber_t, fibers);
+ list_del(&pulp->current->fibers);
+}
+
+
+/* setup a context to run func w/ptr */
+static void setup_context(pulp_t *pulp, pulp_context_t *context, void *func, void *ptr)
+{
+ assert(pulp);
+ assert(context);
+ assert(func);
+
+ getcontext(&context->ucontext);
+ context->ucontext.uc_stack.ss_sp = context->stack;
+ context->ucontext.uc_stack.ss_size = sizeof(context->stack);
+ context->ucontext.uc_link = &pulp->schedule_context.ucontext;
+ makecontext(&context->ucontext, func, 1, ptr);
+}
+
+
+/* sets the supplied context as the executing context */
+static void enter_context(pulp_context_t *context)
+{
+ assert(context);
+
+ setcontext(&context->ucontext);
+}
+
+
+/* swaps the new supplied context with the executing context */
+/* the current context is saved in old */
+static void swap_context(pulp_context_t *old, pulp_context_t *new)
+{
+ assert(old);
+ assert(new);
+
+ if (old == new)
+ return;
+
+ swapcontext(&old->ucontext, &new->ucontext);
+}
+
+
+/* handle a return out of a fiber, equivalent to the fiber exiting */
+static void schedule_context(pulp_t *pulp)
+{
+ assert(pulp);
+
+ if (pulp->current)
+ put_current_fiber(pulp, &pulp->fibers.free);
+
+ pulp_schedule(pulp);
+}
+
+
+/* return time of day in microseconds */
+static pulp_usec_t now(void)
+{
+ struct timeval ts_now;
+ pulp_usec_t now;
+
+ gettimeofday(&ts_now, NULL);
+ now = ts_now.tv_sec * PULP_USECS_PER_SEC;
+ now += ts_now.tv_usec;
+
+ return now;
+}
+
+
+/* create a new pulp scheduler instance */
+pulp_t * pulp_new(void)
+{
+ pulp_t *pulp;
+
+ pulp = calloc(1, sizeof(pulp_t));
+ if (!pulp)
+ return NULL;
+
+ INIT_LIST_HEAD(&pulp->fibers.free);
+ INIT_LIST_HEAD(&pulp->fibers.idle);
+ INIT_LIST_HEAD(&pulp->fibers.run);
+ INIT_LIST_HEAD(&pulp->fibers.sleep);
+ INIT_LIST_HEAD(&pulp->allocs);
+
+ if (more_fibers(pulp) < 0) {
+ free(pulp);
+ return NULL;
+ }
+
+ setup_context(pulp, &pulp->schedule_context, schedule_context, pulp);
+ pulp->now = now();
+
+ return pulp;
+}
+
+
+/* free a pulp scheduler instance */
+void pulp_free(pulp_t *pulp)
+{
+ pulp_fiber_alloc_t *alloc, *_alloc;
+
+ assert(pulp);
+
+ list_for_each_entry_safe(alloc, _alloc, &pulp->allocs, allocs)
+ free(alloc);
+
+ free(pulp);
+}
+
+
+/* move any expired sleeps from pulp->fibers.sleep to run */
+static void expire_alarms(pulp_t *pulp)
+{
+ pulp_fiber_t *f, *_f;
+
+ assert(pulp);
+
+ pulp->now = now();
+
+ /* TODO: use a priority queue */
+ list_for_each_entry_safe(f, _f, &pulp->fibers.sleep, fibers) {
+ if (pulp->now >= f->state.sleep.alarm)
+ list_move_tail(&f->fibers, &pulp->fibers.run);
+ }
+}
+
+
+/* schedule and switch to the next fiber */
+static void pulp_schedule(pulp_t *pulp)
+{
+ pulp_context_t *target_context = &pulp->caller_context;
+ pulp_fiber_t *current;
+
+ assert(pulp);
+
+ expire_alarms(pulp);
+
+ current = pulp->current;
+ pulp->current = NULL;
+
+ if (!list_empty(&pulp->fibers.run)) {
+ set_current_fiber(pulp, &pulp->fibers.run);
+ target_context = &pulp->current->context;
+ }
+
+ if (current)
+ swap_context(&current->context, target_context);
+ else
+ enter_context(target_context);
+}
+
+
+/* Tick a pulp scheduler - runs fibers until all are idle/sleeping.
+ * An estimate of how much time may pass before the next tick should occur is stored in next_tick_delay_us.
+ * If pulp_exit() is called by a fiber, or no more fibers exist, the return value is -1, and next_tick_delay_us is ignored.
+ * If all fibers are idle, the return value is 0, and next_tick_delay_us is ignored.
+ * If any fibers are sleeping, the return value is 1, and next_tick_delay_us is useful.
+ */
+int pulp_tick(pulp_t *pulp, unsigned *next_tick_delay_us)
+{
+ assert(pulp);
+
+ swap_context(&pulp->caller_context, &pulp->schedule_context);
+
+ if (pulp->exited)
+ return -1;
+
+ /* every tick drains the run queue currently */
+ assert(list_empty(&pulp->fibers.run));
+
+ if (!list_empty(&pulp->fibers.sleep)) {
+ /* TODO: get delay from the sleep queue when it's a priority queue */
+ *next_tick_delay_us = 333;
+
+ return 1;
+ }
+
+ if (!list_empty(&pulp->fibers.idle))
+ return 0;
+
+ return -1;
+}
+
+
+/* run a pulp scheduler until pulp_exit() is called or all fibers return */
+void pulp_run(pulp_t *pulp)
+{
+ for (;;) {
+ unsigned delay = 1000000;
+
+ switch (pulp_tick(pulp, &delay)) {
+ case 0:
+ /* XXX: everything idle is a terminally wedged state in pulp_run(),
+ * unless I suppose a signal handler changes a fiber state.
+ */
+ assert(0);
+ /* or fall-through to a sleep on NDEBUG */
+
+ case 1: {
+ struct timespec ts_delay = { 0, delay * 1000 };
+
+ /* TODO: get the minimum sleep from the priority queue when implemented, for now we spin a bit. */
+ nanosleep(&ts_delay, NULL);
+ break;
+ }
+
+ case -1:
+ return;
+ }
+ }
+}
+
+
+/* exit a pulp scheduler from within a fiber */
+/* this causes pulp_tick() to return immediately. */
+void pulp_exit(pulp_t *pulp)
+{
+ assert(pulp);
+
+ pulp->exited = 1;
+ enter_context(&pulp->caller_context);
+}
+
+
+/* find a free fiber - this returns the next free fiber but does not
+ * modify its free state, it remains on the free list at return.
+ */
+static pulp_fiber_t * find_free_fiber(pulp_t *pulp)
+{
+ assert(pulp);
+
+ if (list_empty(&pulp->fibers.free) && more_fibers(pulp) < 0)
+ return NULL;
+
+ return list_entry(pulp->fibers.free.next, pulp_fiber_t, fibers);
+}
+
+
+/* run a new fiber on a pulp scheduler after delay_ms milliseconds (0 for immediately).
+ * returns the created fiber in case the caller wants to
+ * retain the ability to operate on it. Fibers are automatically
+ * reclaimed on return so this generally isn't necessary.
+ */
+pulp_fiber_t * pulp_fiber_new(pulp_t *pulp, unsigned delay_ms, thunk_t *thunk)
+{
+ pulp_fiber_t *fiber;
+
+ assert(pulp);
+ assert(thunk);
+
+ fiber = find_free_fiber(pulp);
+ if (!fiber)
+ return NULL;
+
+ setup_context(pulp, &fiber->context, thunk->dispatch, thunk);
+
+ if (!delay_ms) {
+ list_move_tail(&fiber->fibers, &pulp->fibers.run);
+ } else {
+ fiber->state.sleep.alarm = pulp->now + delay_ms * PULP_USECS_PER_MSEC;
+ list_move_tail(&fiber->fibers, &pulp->fibers.sleep);
+ }
+
+ return fiber;
+}
+
+
+/* sleep for the supplied number of microseconds (not public) */
+static void pulp_usleep(pulp_t *pulp, unsigned useconds)
+{
+ assert(pulp);
+
+ pulp->current->state.sleep.alarm = pulp->now + useconds;
+ put_current_fiber(pulp, &pulp->fibers.sleep);
+ pulp_schedule(pulp);
+}
+
+
+/* sleep for the supplied number of milliseconds */
+void pulp_msleep(pulp_t *pulp, unsigned milliseconds)
+{
+ assert(pulp);
+
+ return pulp_usleep(pulp, milliseconds * PULP_USECS_PER_MSEC);
+}
+
+
+/* sleep for the supplied number of seconds */
+void pulp_sleep(pulp_t *pulp, unsigned seconds)
+{
+ assert(pulp);
+
+ return pulp_usleep(pulp, seconds * PULP_USECS_PER_SEC);
+}
+
+
+/* return 'now' from the scheduler */
+/* this is a convenience/optimization for fibers which need the current
+ * time - the scheduler already caches it, so make it available.
+ */
+pulp_usec_t pulp_now(pulp_t *pulp)
+{
+ return pulp->now;
+}
+
+
+/* TODO: interfaces for idling/waking fibers */
+/* TODO: interfaces for synchronization across fibers */
+/* all these are left to be implemented as their needs arise */
© All Rights Reserved