/* * Copyright (C) 2018 Vito Caputo - * * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License version 3 as published * by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ /* pulp implements a very simple cooperative multi-tasking scheduler * with zero consideration for IO-wait states. This only concerns itself * with the switching of lightweight CPU-bound fibers which may either * exit, sleep for a specified duration, or go idle indefinitely with * the possibility of being awoken by other fibers via the pulp api. * * No consideration for SMP/threads has been made, no locking or atomic * stuff is in effect here. You can run a pulp scheduler per-thread to * scale to SMP, but migrating/stealing fibers across the per-thread schedulers * is not supported at this time. * */ #ifdef __MACH__ #ifndef _XOPEN_SOURCE #define _XOPEN_SOURCE #endif #endif #include #include #include #include #include #include #ifdef __WIN32__ /* windows doesn't have ucontext.h, but it does have a fiber API */ #include #else #include #endif #include "list.h" #include "pulp.h" #define PULP_USECS_PER_SEC 1000000ULL #define PULP_USECS_PER_MSEC 1000ULL #define PULP_FIBER_ALLOC_NUM 32 #ifdef __MACH__ #include #define PULP_FIBER_STACK_SIZE SIGSTKSZ #else #define PULP_FIBER_STACK_SIZE (16 * 1024) #endif typedef struct pulp_context_t { #ifdef __WIN32__ LPVOID win32_fiber; #else ucontext_t ucontext; char stack[PULP_FIBER_STACK_SIZE]; #endif } pulp_context_t; typedef struct pulp_fiber_t { list_head_t fibers; /* node on free/idle/run/sleep list */ union { struct { pulp_usec_t alarm; pulp_mailbox_t *mailbox; /* non-NULL if fiber receives mail */ } sleep; } state; pulp_context_t context; thunk_t *thunk; } pulp_fiber_t; typedef struct pulp_fiber_alloc_t { list_head_t allocs; unsigned n_fibers; pulp_fiber_t fibers[]; } pulp_fiber_alloc_t; typedef struct pulp_t { pulp_fiber_t *current; /* currently running fiber */ pulp_usec_t now; /* current time updated @ schedule via expire_alarms() */ int exited:1; /* set when pulp_exit() triggered switch to caller */ struct { list_head_t free; list_head_t idle; list_head_t run; list_head_t sleep; } fibers; list_head_t allocs; /* list of pulp_fiber_alloc_t */ pulp_context_t trampoline_context; pulp_context_t caller_context; } pulp_t; static void pulp_schedule(pulp_t *pulp); /* allocate more free fibers */ static int more_fibers(pulp_t *pulp) { pulp_fiber_alloc_t *alloc; unsigned i; assert(pulp); alloc = calloc(1, sizeof(pulp_fiber_alloc_t) + sizeof(pulp_fiber_t) * PULP_FIBER_ALLOC_NUM); if (!alloc) return -ENOMEM; alloc->n_fibers = PULP_FIBER_ALLOC_NUM; /* TODO: maybe grow this? */ for (i = 0; i < alloc->n_fibers; i++) { pulp_fiber_t *fiber = &alloc->fibers[i]; list_add_tail(&fiber->fibers, &pulp->fibers.free); } list_add(&alloc->allocs, &pulp->allocs); return 0; } /* put the current fiber on a list @ tail */ /* note pulp->current is left intact, it's pulp_schedule()'s responsibility * to replace it. */ static inline void put_current_fiber(pulp_t *pulp, list_head_t *where) { assert(pulp); assert(where); assert(pulp->current); list_add(&pulp->current->fibers, where); } /* set the current fiber from the first entry on the list at head */ static inline void set_current_fiber(pulp_t *pulp, list_head_t *head) { assert(pulp); assert(head); assert(!list_empty(head)); pulp->current = list_entry(head->next, pulp_fiber_t, fibers); list_del(&pulp->current->fibers); } #ifdef __WIN32__ /* win32 fibers can't return or the program exits, so their functions need to * run from a trampoline function, and there's only one pointer supplied so now * I need the fiber's thunk saved in the fiber structure, and I find the fiber * to run via pulp->current. * * Note this helper is only used with thunk/pulp_fiber_new() contexts. */ static void win32_trampoline(void *ptr) { pulp_t *pulp = ptr; assert(pulp); assert(pulp->current); assert(pulp->current->thunk); (void) thunk_dispatch(pulp->current->thunk); SwitchToFiber(pulp->trampoline_context.win32_fiber); } /* setup a context to run func w/ptr */ static void setup_context(pulp_t *pulp, pulp_context_t *context, void *func, void *ptr) { thunk_t *thunk = ptr; assert(pulp); assert(context); assert(func); if (thunk && thunk->dispatch == func) /* yuck */ context->win32_fiber = CreateFiber(PULP_FIBER_STACK_SIZE, win32_trampoline, pulp); else context->win32_fiber = CreateFiber(PULP_FIBER_STACK_SIZE, func, ptr); } /* swaps the new supplied context with the executing context */ /* the current context is saved in old */ static void swap_context(pulp_context_t *old, pulp_context_t *new) { assert(old); assert(new); if (old == new) return; SwitchToFiber(new->win32_fiber); } /* destroy the fiber */ static void destroy_context(pulp_context_t *context) { assert(context); DeleteFiber(context->win32_fiber); } #else /* setup a context to run func w/ptr */ static void setup_context(pulp_t *pulp, pulp_context_t *context, void *func, void *ptr) { assert(pulp); assert(context); assert(func); getcontext(&context->ucontext); context->ucontext.uc_stack.ss_sp = context->stack; context->ucontext.uc_stack.ss_size = sizeof(context->stack); context->ucontext.uc_link = &pulp->trampoline_context.ucontext; makecontext(&context->ucontext, func, 1, ptr); /* FIXME: technically, this is only supposed to take int arguments */ } /* swaps the new supplied context with the executing context */ /* the current context is saved in old */ static void swap_context(pulp_context_t *old, pulp_context_t *new) { assert(old); assert(new); if (old == new) return; swapcontext(&old->ucontext, &new->ucontext); } /* ucontext has nothing to destroy */ static void destroy_context(pulp_context_t *context) { } #endif /* Infinitely schedules fibers, this gets its own context in * pulp->trampoline_context. * * On ucontext systems, this is what pulp_tick() and uc_link swap to. * * On win32, there's a win32_trampoline helper which wraps the fiber's * function, entering pulp->trampoline_context when the fiber's function * returns, like ucontext does for me with the uc_link member. * * It's necessary that fiber cleanup occur in a separate context, since you * can't destroy the calling context. * * Note that though this is coded as an infinite loop, pulp_schedule()'s * default target is pulp->caller_context. When the run queue is drained, the * caller context is swapped with the trampoline, and pulp_tick() returns. */ static void trampoline(pulp_t *pulp) { assert(pulp); /* note any time the trampoline is entered with a non-NULL * pulp->current, it's assumed to be an exited fiber situation @ * pulp->current. So anything swapping to trampoline_context outside * of the fiber cleanup path must either set pulp->current to NULL or * be a fiber return/destroy path. */ for (;;) { if (pulp->current) { destroy_context(&pulp->current->context); put_current_fiber(pulp, &pulp->fibers.free); pulp->current = NULL; } pulp_schedule(pulp); } } /* return time of day in microseconds */ static pulp_usec_t now(void) { struct timeval ts_now; pulp_usec_t now; gettimeofday(&ts_now, NULL); now = ts_now.tv_sec * PULP_USECS_PER_SEC; now += ts_now.tv_usec; return now; } /* initialize the pulp library */ /* Call this prior to calling any other pulp functions. * Returns 0 on success, negative value on error. */ int pulp_init(void) { #ifdef __WIN32__ if (!ConvertThreadToFiber(NULL)) return -1; #endif return 0; } /* create a new pulp scheduler instance */ pulp_t * pulp_new(void) { pulp_t *pulp; pulp = calloc(1, sizeof(pulp_t)); if (!pulp) return NULL; INIT_LIST_HEAD(&pulp->fibers.free); INIT_LIST_HEAD(&pulp->fibers.idle); INIT_LIST_HEAD(&pulp->fibers.run); INIT_LIST_HEAD(&pulp->fibers.sleep); INIT_LIST_HEAD(&pulp->allocs); if (more_fibers(pulp) < 0) { free(pulp); return NULL; } setup_context(pulp, &pulp->trampoline_context, trampoline, pulp); pulp->now = now(); return pulp; } /* reset a pulp scheduler instance for reuse */ void pulp_reset(pulp_t *pulp) { pulp_fiber_t *f; assert(pulp); pulp->exited = 0; pulp->now = now(); /* XXX: Note the idle queue isn't currently utilized */ list_for_each_entry(f, &pulp->fibers.idle, fibers) destroy_context(&f->context); list_splice_init(&pulp->fibers.idle, &pulp->fibers.free); /* The run queue is drained per-tick, but newly created fibers * with 0 delay go directly on the run queue. So it's possible * there will be some there outside a tick. */ list_for_each_entry(f, &pulp->fibers.run, fibers) destroy_context(&f->context); list_splice_init(&pulp->fibers.run, &pulp->fibers.free); list_for_each_entry(f, &pulp->fibers.sleep, fibers) destroy_context(&f->context); list_splice_init(&pulp->fibers.sleep, &pulp->fibers.free); } /* free a pulp scheduler instance */ void pulp_free(pulp_t *pulp) { pulp_fiber_alloc_t *alloc, *_alloc; assert(pulp); pulp_reset(pulp); list_for_each_entry_safe(alloc, _alloc, &pulp->allocs, allocs) free(alloc); free(pulp); } /* move any expired sleeps from pulp->fibers.sleep to run */ static void expire_alarms(pulp_t *pulp) { pulp_fiber_t *f, *_f; assert(pulp); pulp->now = now(); /* TODO: use a priority queue */ list_for_each_entry_safe(f, _f, &pulp->fibers.sleep, fibers) { if (pulp->now >= f->state.sleep.alarm) list_move_tail(&f->fibers, &pulp->fibers.run); } } /* schedule and switch to the next fiber */ static void pulp_schedule(pulp_t *pulp) { pulp_context_t *target_context = &pulp->caller_context; pulp_context_t *source_context = &pulp->trampoline_context; pulp_fiber_t *current; assert(pulp); current = pulp->current; pulp->current = NULL; if (!list_empty(&pulp->fibers.run) && !pulp->exited) { set_current_fiber(pulp, &pulp->fibers.run); target_context = &pulp->current->context; } if (current) source_context = ¤t->context; swap_context(source_context, target_context); } /* Tick a pulp scheduler - runs fibers until all are idle/sleeping. * * An estimate of how much time may pass before the next tick should occur is * stored in next_tick_delay_us (if non-NULL). * * If pulp_exit() is called by a fiber, or no more fibers exist, the return * value is -1, and next_tick_delay_us is ignored. * * If all fibers are idle, the return value is 0, and next_tick_delay_us is * ignored. * * If any fibers are sleeping, the return value is 1, and next_tick_delay_us is * useful. */ int pulp_tick(pulp_t *pulp, unsigned *next_tick_delay_us) { assert(pulp); assert(!pulp->current); expire_alarms(pulp); #ifdef __WIN32__ /* TODO: maybe just move to win32 swap_context()? */ pulp->caller_context.win32_fiber = GetCurrentFiber(); #endif swap_context(&pulp->caller_context, &pulp->trampoline_context); if (pulp->exited) return -1; /* every tick drains the run queue currently */ assert(list_empty(&pulp->fibers.run)); if (!list_empty(&pulp->fibers.sleep)) { /* TODO: get delay from the sleep queue when it's a priority queue */ if (next_tick_delay_us) *next_tick_delay_us = 333; return 1; } if (!list_empty(&pulp->fibers.idle)) return 0; return -1; } /* run a pulp scheduler until pulp_exit() is called or all fibers return */ void pulp_run(pulp_t *pulp) { for (;;) { unsigned delay = 1000000; switch (pulp_tick(pulp, &delay)) { case 0: /* XXX: everything idle is a terminally wedged state in pulp_run(), * unless I suppose a signal handler changes a fiber state. */ assert(0); /* or fall-through to a sleep on NDEBUG */ case 1: { /* TODO: get the minimum sleep from the priority queue when implemented, for now we spin a bit. */ #ifdef __WIN32__ Sleep(10); /* FIXME, note this function is just for testing anyways. */ #else struct timespec ts_delay = { 0, delay * 1000 }; nanosleep(&ts_delay, NULL); #endif break; } case -1: return; } } } /* exit a pulp scheduler from within a fiber */ /* this causes pulp_tick() to return immediately. */ void pulp_exit(pulp_t *pulp) { assert(pulp); assert(pulp->current); pulp->exited = 1; swap_context(&pulp->current->context, &pulp->trampoline_context); } /* find a free fiber - this returns the next free fiber but does not * modify its free state, it remains on the free list at return. */ static pulp_fiber_t * find_free_fiber(pulp_t *pulp) { assert(pulp); if (list_empty(&pulp->fibers.free) && more_fibers(pulp) < 0) return NULL; return list_entry(pulp->fibers.free.next, pulp_fiber_t, fibers); } /* run a new fiber on a pulp scheduler after delay_ms milliseconds (0 for immediately). * returns the created fiber in case the caller wants to * retain the ability to operate on it. Fibers are automatically * reclaimed on return so this generally isn't necessary. */ pulp_fiber_t * pulp_fiber_new(pulp_t *pulp, unsigned delay_ms, thunk_t *thunk) { pulp_fiber_t *fiber; assert(pulp); assert(thunk); fiber = find_free_fiber(pulp); if (!fiber) return NULL; setup_context(pulp, &fiber->context, thunk->dispatch, thunk); if (!delay_ms) { list_move_tail(&fiber->fibers, &pulp->fibers.run); } else { fiber->state.sleep.alarm = pulp->now + delay_ms * PULP_USECS_PER_MSEC; list_move_tail(&fiber->fibers, &pulp->fibers.sleep); } fiber->thunk = thunk; return fiber; } /* return the current fiber */ pulp_fiber_t * pulp_self(pulp_t *pulp) { return pulp->current; } /* sleep for the supplied number of microseconds (not public) */ /* if mailbox is non-NULL it may be used to receive mail while sleeping via pulp_fiber_get_mailslot() */ static void pulp_usleep(pulp_t *pulp, unsigned useconds, pulp_mailbox_t *mailbox) { assert(pulp); assert(pulp->current); pulp->current->state.sleep.mailbox = mailbox; if (mailbox) mailbox->count = 0; pulp->current->state.sleep.alarm = pulp->now + useconds; put_current_fiber(pulp, &pulp->fibers.sleep); pulp_schedule(pulp); } /* sleep for the supplied number of milliseconds */ /* if mailbox is non-NULL it may be used to receive mail while sleeping via pulp_fiber_get_mailslot() */ void pulp_msleep(pulp_t *pulp, unsigned milliseconds, pulp_mailbox_t *mailbox) { return pulp_usleep(pulp, milliseconds * PULP_USECS_PER_MSEC, mailbox); } /* sleep for the supplied number of seconds */ /* if mailbox is non-NULL it may be used to receive mail while sleeping via pulp_fiber_get_mailslot() */ void pulp_sleep(pulp_t *pulp, unsigned seconds, pulp_mailbox_t *mailbox) { return pulp_usleep(pulp, seconds * PULP_USECS_PER_SEC, mailbox); } /* return 'now' from the scheduler */ /* this is a convenience/optimization for fibers which need the current * time - the scheduler already caches it, so make it available. */ pulp_usec_t pulp_now(pulp_t *pulp) { assert(pulp); return pulp->now; } /* Get a mailslot in a destination fiber's mailbox. * If the fiber has no mailbox, -ENOENT is returned. * If the mailbox is full, -ENOSPC is returned. * On success 0 is returned and the mailslot pointer is stored @ *res_mailslot. * * The mailslot can only be treated as valid until the calling fiber sleeps, * after sleeping the receiving fiber may execute, then free, or clear and * reuse its mailbox. * * This interface returns a pointer to the slot rather than accepting something * like a void * value to store at the slot to allow a greater variety of data * passing models. If your fibers are creating mailboxes having slots filled * with pointers to valid memory, then the sender may dereference the * mailslot's pointer to the space for storing the message rather than having * to allocate space for passing messages larger than a pointer. * * Of course, if your scenario is simple enough, you can always simply write to * the mailslot's void * worth of space. There's also the possibility of not * writing anything at all; if the only thing needed is a basic signal - the * mailbox count will be advanced wether you do anything with the mailslot or * not. It's up to you to define the contract for your fibers to agree on. */ int pulp_fiber_get_mailslot(pulp_fiber_t *fiber, void ***res_mailslot) { assert(fiber); assert(res_mailslot); /* XXX: this doesn't currently implement any blocking - if the mailbox * is full or the fiber isn't receiving mail (no mailbox), we simply return * failure. It seems trivial to make this fiber go to sleep when the * mailbox is full then retrying when it comes back though. We'll see if * that's desirable (reliable delivery). What I'm expecting is that mailboxes * should just be sized to never experience blocking. */ /* TODO: note that this also doesn't currently wake the fiber or otherwise * enter the scheduler. * I left that out, as I suspect it may be desirable to just add a separate * wake function. The current use case of this library uses tiny sleeps * more or less as yields so it's just not an issue yet. */ /* TODO: nothing is sanity checked at this time, I assume the supplied fiber * is valid and sleeping. */ if (!fiber->state.sleep.mailbox) return -ENOENT; if (fiber->state.sleep.mailbox->count >= fiber->state.sleep.mailbox->size) return -ENOSPC; *res_mailslot = &fiber->state.sleep.mailbox->slots[fiber->state.sleep.mailbox->count++]; return 0; } /* TODO: interfaces for idling/waking fibers */ /* TODO: interfaces for synchronization across fibers */ /* all these are left to be implemented as their needs arise */