/* * Copyright (C) 2018 Vito Caputo - * * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License version 3 as published * by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ /* pulp implements a very simple cooperative multi-tasking scheduler * with zero consideration for IO-wait states. This only concerns itself * with the switching of lightweight CPU-bound fibers which may either * exit, sleep for a specified duration, or go idle indefinitely with * the possibility of being awoken by other fibers via the pulp api. * * No consideration for SMP/threads has been made, no locking or atomic * stuff is in effect here. You can run a pulp scheduler per-thread to * scale to SMP, but migrating/stealing fibers across the per-thread schedulers * is not supported at this time. * */ #include #include #include #include #include #include #include #include "list.h" #include "pulp.h" /* for when this needs to support windows: * https://www.codeproject.com/Tips/4225/Unix-ucontext-t-Operations-on-Windows-Platforms */ #define PULP_USECS_PER_SEC 1000000ULL #define PULP_USECS_PER_MSEC 1000ULL #define PULP_FIBER_STACK_SIZE (16 * 1024) #define PULP_FIBER_ALLOC_NUM 32 typedef struct pulp_context_t { ucontext_t ucontext; char stack[PULP_FIBER_STACK_SIZE]; } pulp_context_t; typedef struct pulp_fiber_t { list_head_t fibers; /* node on free/idle/run/sleep list */ union { struct { pulp_usec_t alarm; } sleep; } state; pulp_context_t context; } pulp_fiber_t; typedef struct pulp_fiber_alloc_t { list_head_t allocs; unsigned n_fibers; pulp_fiber_t fibers[]; } pulp_fiber_alloc_t; typedef struct pulp_t { pulp_fiber_t *current; /* currently running fiber */ pulp_usec_t now; /* current time updated @ schedule via expire_alarms() */ int exited:1; /* set when pulp_exit() triggered switch to caller */ struct { list_head_t free; list_head_t idle; list_head_t run; list_head_t sleep; } fibers; list_head_t allocs; /* list of pulp_fiber_alloc_t */ pulp_context_t schedule_context; pulp_context_t caller_context; } pulp_t; static void pulp_schedule(pulp_t *pulp); /* allocate more free fibers */ static int more_fibers(pulp_t *pulp) { pulp_fiber_alloc_t *alloc; unsigned i; assert(pulp); alloc = calloc(1, sizeof(pulp_fiber_alloc_t) + sizeof(pulp_fiber_t) * PULP_FIBER_ALLOC_NUM); if (!alloc) return -ENOMEM; alloc->n_fibers = PULP_FIBER_ALLOC_NUM; /* TODO: maybe grow this? */ for (i = 0; i < alloc->n_fibers; i++) { pulp_fiber_t *fiber = &alloc->fibers[i]; list_add_tail(&fiber->fibers, &pulp->fibers.free); } list_add(&alloc->allocs, &pulp->allocs); return 0; } /* put the current fiber on a list @ tail */ /* note pulp->current is left intact, it's pulp_schedule()'s responsibility * to replace it. */ static inline void put_current_fiber(pulp_t *pulp, list_head_t *where) { assert(pulp); assert(where); assert(pulp->current); list_add(&pulp->current->fibers, where); } /* set the current fiber from the first entry on the list at head */ static inline void set_current_fiber(pulp_t *pulp, list_head_t *head) { assert(pulp); assert(head); assert(!list_empty(head)); pulp->current = list_entry(head->next, pulp_fiber_t, fibers); list_del(&pulp->current->fibers); } /* setup a context to run func w/ptr */ static void setup_context(pulp_t *pulp, pulp_context_t *context, void *func, void *ptr) { assert(pulp); assert(context); assert(func); getcontext(&context->ucontext); context->ucontext.uc_stack.ss_sp = context->stack; context->ucontext.uc_stack.ss_size = sizeof(context->stack); context->ucontext.uc_link = &pulp->schedule_context.ucontext; makecontext(&context->ucontext, func, 1, ptr); } /* sets the supplied context as the executing context */ static void enter_context(pulp_context_t *context) { assert(context); setcontext(&context->ucontext); } /* swaps the new supplied context with the executing context */ /* the current context is saved in old */ static void swap_context(pulp_context_t *old, pulp_context_t *new) { assert(old); assert(new); if (old == new) return; swapcontext(&old->ucontext, &new->ucontext); } /* handle a return out of a fiber, equivalent to the fiber exiting */ static void schedule_context(pulp_t *pulp) { assert(pulp); if (pulp->current) put_current_fiber(pulp, &pulp->fibers.free); pulp_schedule(pulp); } /* return time of day in microseconds */ static pulp_usec_t now(void) { struct timeval ts_now; pulp_usec_t now; gettimeofday(&ts_now, NULL); now = ts_now.tv_sec * PULP_USECS_PER_SEC; now += ts_now.tv_usec; return now; } /* create a new pulp scheduler instance */ pulp_t * pulp_new(void) { pulp_t *pulp; pulp = calloc(1, sizeof(pulp_t)); if (!pulp) return NULL; INIT_LIST_HEAD(&pulp->fibers.free); INIT_LIST_HEAD(&pulp->fibers.idle); INIT_LIST_HEAD(&pulp->fibers.run); INIT_LIST_HEAD(&pulp->fibers.sleep); INIT_LIST_HEAD(&pulp->allocs); if (more_fibers(pulp) < 0) { free(pulp); return NULL; } setup_context(pulp, &pulp->schedule_context, schedule_context, pulp); pulp->now = now(); return pulp; } /* free a pulp scheduler instance */ void pulp_free(pulp_t *pulp) { pulp_fiber_alloc_t *alloc, *_alloc; assert(pulp); list_for_each_entry_safe(alloc, _alloc, &pulp->allocs, allocs) free(alloc); free(pulp); } /* move any expired sleeps from pulp->fibers.sleep to run */ static void expire_alarms(pulp_t *pulp) { pulp_fiber_t *f, *_f; assert(pulp); pulp->now = now(); /* TODO: use a priority queue */ list_for_each_entry_safe(f, _f, &pulp->fibers.sleep, fibers) { if (pulp->now >= f->state.sleep.alarm) list_move_tail(&f->fibers, &pulp->fibers.run); } } /* schedule and switch to the next fiber */ static void pulp_schedule(pulp_t *pulp) { pulp_context_t *target_context = &pulp->caller_context; pulp_fiber_t *current; assert(pulp); current = pulp->current; pulp->current = NULL; if (!list_empty(&pulp->fibers.run)) { set_current_fiber(pulp, &pulp->fibers.run); target_context = &pulp->current->context; } if (current) swap_context(¤t->context, target_context); else enter_context(target_context); } /* Tick a pulp scheduler - runs fibers until all are idle/sleeping. * An estimate of how much time may pass before the next tick should occur is stored in next_tick_delay_us. * If pulp_exit() is called by a fiber, or no more fibers exist, the return value is -1, and next_tick_delay_us is ignored. * If all fibers are idle, the return value is 0, and next_tick_delay_us is ignored. * If any fibers are sleeping, the return value is 1, and next_tick_delay_us is useful. */ int pulp_tick(pulp_t *pulp, unsigned *next_tick_delay_us) { assert(pulp); expire_alarms(pulp); swap_context(&pulp->caller_context, &pulp->schedule_context); if (pulp->exited) return -1; /* every tick drains the run queue currently */ assert(list_empty(&pulp->fibers.run)); if (!list_empty(&pulp->fibers.sleep)) { /* TODO: get delay from the sleep queue when it's a priority queue */ *next_tick_delay_us = 333; return 1; } if (!list_empty(&pulp->fibers.idle)) return 0; return -1; } /* run a pulp scheduler until pulp_exit() is called or all fibers return */ void pulp_run(pulp_t *pulp) { for (;;) { unsigned delay = 1000000; switch (pulp_tick(pulp, &delay)) { case 0: /* XXX: everything idle is a terminally wedged state in pulp_run(), * unless I suppose a signal handler changes a fiber state. */ assert(0); /* or fall-through to a sleep on NDEBUG */ case 1: { struct timespec ts_delay = { 0, delay * 1000 }; /* TODO: get the minimum sleep from the priority queue when implemented, for now we spin a bit. */ nanosleep(&ts_delay, NULL); break; } case -1: return; } } } /* exit a pulp scheduler from within a fiber */ /* this causes pulp_tick() to return immediately. */ void pulp_exit(pulp_t *pulp) { assert(pulp); pulp->exited = 1; enter_context(&pulp->caller_context); } /* find a free fiber - this returns the next free fiber but does not * modify its free state, it remains on the free list at return. */ static pulp_fiber_t * find_free_fiber(pulp_t *pulp) { assert(pulp); if (list_empty(&pulp->fibers.free) && more_fibers(pulp) < 0) return NULL; return list_entry(pulp->fibers.free.next, pulp_fiber_t, fibers); } /* run a new fiber on a pulp scheduler after delay_ms milliseconds (0 for immediately). * returns the created fiber in case the caller wants to * retain the ability to operate on it. Fibers are automatically * reclaimed on return so this generally isn't necessary. */ pulp_fiber_t * pulp_fiber_new(pulp_t *pulp, unsigned delay_ms, thunk_t *thunk) { pulp_fiber_t *fiber; assert(pulp); assert(thunk); fiber = find_free_fiber(pulp); if (!fiber) return NULL; setup_context(pulp, &fiber->context, thunk->dispatch, thunk); if (!delay_ms) { list_move_tail(&fiber->fibers, &pulp->fibers.run); } else { fiber->state.sleep.alarm = pulp->now + delay_ms * PULP_USECS_PER_MSEC; list_move_tail(&fiber->fibers, &pulp->fibers.sleep); } return fiber; } /* sleep for the supplied number of microseconds (not public) */ static void pulp_usleep(pulp_t *pulp, unsigned useconds) { assert(pulp); pulp->current->state.sleep.alarm = pulp->now + useconds; put_current_fiber(pulp, &pulp->fibers.sleep); pulp_schedule(pulp); } /* sleep for the supplied number of milliseconds */ void pulp_msleep(pulp_t *pulp, unsigned milliseconds) { assert(pulp); return pulp_usleep(pulp, milliseconds * PULP_USECS_PER_MSEC); } /* sleep for the supplied number of seconds */ void pulp_sleep(pulp_t *pulp, unsigned seconds) { assert(pulp); return pulp_usleep(pulp, seconds * PULP_USECS_PER_SEC); } /* return 'now' from the scheduler */ /* this is a convenience/optimization for fibers which need the current * time - the scheduler already caches it, so make it available. */ pulp_usec_t pulp_now(pulp_t *pulp) { return pulp->now; } /* TODO: interfaces for idling/waking fibers */ /* TODO: interfaces for synchronization across fibers */ /* all these are left to be implemented as their needs arise */