diff options
-rw-r--r-- | configure.ac | 6 | ||||
-rw-r--r-- | m4/ax_pthread.m4 | 332 | ||||
-rw-r--r-- | src/iou.c | 171 | ||||
-rw-r--r-- | src/iou.h | 1 |
4 files changed, 500 insertions, 10 deletions
diff --git a/configure.ac b/configure.ac index cfe6dca..db389ef 100644 --- a/configure.ac +++ b/configure.ac @@ -1,5 +1,6 @@ AC_INIT([libiou], [0.1], [vcaputo@pengaru.com]) AM_INIT_AUTOMAKE([-Wall -Werror foreign]) +AC_CONFIG_MACRO_DIRS([m4]) AC_PROG_CC AM_PROG_CC_C_O AM_PROG_AR @@ -8,6 +9,11 @@ AM_SILENT_RULES([yes]) CFLAGS="$CFLAGS -Wall" +AX_PTHREAD +LIBS="$PTHREAD_LIBS $LIBS" +CFLAGS="$CFLAGS $PTHREAD_CFLAGS" +CC="$PTHREAD_CC" + dnl Check for liburing PKG_CHECK_MODULES(URING, liburing) CFLAGS="$CFLAGS $URING_CFLAGS" diff --git a/m4/ax_pthread.m4 b/m4/ax_pthread.m4 new file mode 100644 index 0000000..d383ad5 --- /dev/null +++ b/m4/ax_pthread.m4 @@ -0,0 +1,332 @@ +# =========================================================================== +# http://www.gnu.org/software/autoconf-archive/ax_pthread.html +# =========================================================================== +# +# SYNOPSIS +# +# AX_PTHREAD([ACTION-IF-FOUND[, ACTION-IF-NOT-FOUND]]) +# +# DESCRIPTION +# +# This macro figures out how to build C programs using POSIX threads. It +# sets the PTHREAD_LIBS output variable to the threads library and linker +# flags, and the PTHREAD_CFLAGS output variable to any special C compiler +# flags that are needed. (The user can also force certain compiler +# flags/libs to be tested by setting these environment variables.) +# +# Also sets PTHREAD_CC to any special C compiler that is needed for +# multi-threaded programs (defaults to the value of CC otherwise). (This +# is necessary on AIX to use the special cc_r compiler alias.) +# +# NOTE: You are assumed to not only compile your program with these flags, +# but also link it with them as well. e.g. you should link with +# $PTHREAD_CC $CFLAGS $PTHREAD_CFLAGS $LDFLAGS ... $PTHREAD_LIBS $LIBS +# +# If you are only building threads programs, you may wish to use these +# variables in your default LIBS, CFLAGS, and CC: +# +# LIBS="$PTHREAD_LIBS $LIBS" +# CFLAGS="$CFLAGS $PTHREAD_CFLAGS" +# CC="$PTHREAD_CC" +# +# In addition, if the PTHREAD_CREATE_JOINABLE thread-attribute constant +# has a nonstandard name, defines PTHREAD_CREATE_JOINABLE to that name +# (e.g. PTHREAD_CREATE_UNDETACHED on AIX). +# +# Also HAVE_PTHREAD_PRIO_INHERIT is defined if pthread is found and the +# PTHREAD_PRIO_INHERIT symbol is defined when compiling with +# PTHREAD_CFLAGS. +# +# ACTION-IF-FOUND is a list of shell commands to run if a threads library +# is found, and ACTION-IF-NOT-FOUND is a list of commands to run it if it +# is not found. If ACTION-IF-FOUND is not specified, the default action +# will define HAVE_PTHREAD. +# +# Please let the authors know if this macro fails on any platform, or if +# you have any other suggestions or comments. This macro was based on work +# by SGJ on autoconf scripts for FFTW (http://www.fftw.org/) (with help +# from M. Frigo), as well as ac_pthread and hb_pthread macros posted by +# Alejandro Forero Cuervo to the autoconf macro repository. We are also +# grateful for the helpful feedback of numerous users. +# +# Updated for Autoconf 2.68 by Daniel Richard G. +# +# LICENSE +# +# Copyright (c) 2008 Steven G. Johnson <stevenj@alum.mit.edu> +# Copyright (c) 2011 Daniel Richard G. <skunk@iSKUNK.ORG> +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the respective Autoconf Macro's copyright owner +# gives unlimited permission to copy, distribute and modify the configure +# scripts that are the output of Autoconf when processing the Macro. You +# need not follow the terms of the GNU General Public License when using +# or distributing such scripts, even though portions of the text of the +# Macro appear in them. The GNU General Public License (GPL) does govern +# all other use of the material that constitutes the Autoconf Macro. +# +# This special exception to the GPL applies to versions of the Autoconf +# Macro released by the Autoconf Archive. When you make and distribute a +# modified version of the Autoconf Macro, you may extend this special +# exception to the GPL to apply to your modified version as well. + +#serial 21 + +AU_ALIAS([ACX_PTHREAD], [AX_PTHREAD]) +AC_DEFUN([AX_PTHREAD], [ +AC_REQUIRE([AC_CANONICAL_HOST]) +AC_LANG_PUSH([C]) +ax_pthread_ok=no + +# We used to check for pthread.h first, but this fails if pthread.h +# requires special compiler flags (e.g. on True64 or Sequent). +# It gets checked for in the link test anyway. + +# First of all, check if the user has set any of the PTHREAD_LIBS, +# etcetera environment variables, and if threads linking works using +# them: +if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS]) + AC_TRY_LINK_FUNC([pthread_join], [ax_pthread_ok=yes]) + AC_MSG_RESULT([$ax_pthread_ok]) + if test x"$ax_pthread_ok" = xno; then + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" + fi + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" +fi + +# We must check for the threads library under a number of different +# names; the ordering is very important because some systems +# (e.g. DEC) have both -lpthread and -lpthreads, where one of the +# libraries is broken (non-POSIX). + +# Create a list of thread flags to try. Items starting with a "-" are +# C compiler flags, and other items are library names, except for "none" +# which indicates that we try without any flags at all, and "pthread-config" +# which is a program returning the flags for the Pth emulation library. + +ax_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config" + +# The ordering *is* (sometimes) important. Some notes on the +# individual items follow: + +# pthreads: AIX (must check this before -lpthread) +# none: in case threads are in libc; should be tried before -Kthread and +# other compiler flags to prevent continual compiler warnings +# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h) +# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able) +# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread) +# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads) +# -pthreads: Solaris/gcc +# -mthreads: Mingw32/gcc, Lynx/gcc +# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it +# doesn't hurt to check since this sometimes defines pthreads too; +# also defines -D_REENTRANT) +# ... -mt is also the pthreads flag for HP/aCC +# pthread: Linux, etcetera +# --thread-safe: KAI C++ +# pthread-config: use pthread-config program (for GNU Pth library) + +case ${host_os} in + solaris*) + + # On Solaris (at least, for some versions), libc contains stubbed + # (non-functional) versions of the pthreads routines, so link-based + # tests will erroneously succeed. (We need to link with -pthreads/-mt/ + # -lpthread.) (The stubs are missing pthread_cleanup_push, or rather + # a function called by this macro, so we could check for that, but + # who knows whether they'll stub that too in a future libc.) So, + # we'll just look for -pthreads and -lpthread first: + + ax_pthread_flags="-pthreads pthread -mt -pthread $ax_pthread_flags" + ;; + + darwin*) + ax_pthread_flags="-pthread $ax_pthread_flags" + ;; +esac + +# Clang doesn't consider unrecognized options an error unless we specify +# -Werror. We throw in some extra Clang-specific options to ensure that +# this doesn't happen for GCC, which also accepts -Werror. + +AC_MSG_CHECKING([if compiler needs -Werror to reject unknown flags]) +save_CFLAGS="$CFLAGS" +ax_pthread_extra_flags="-Werror" +CFLAGS="$CFLAGS $ax_pthread_extra_flags -Wunknown-warning-option -Wsizeof-array-argument" +AC_COMPILE_IFELSE([AC_LANG_PROGRAM([int foo(void);],[foo()])], + [AC_MSG_RESULT([yes])], + [ax_pthread_extra_flags= + AC_MSG_RESULT([no])]) +CFLAGS="$save_CFLAGS" + +if test x"$ax_pthread_ok" = xno; then +for flag in $ax_pthread_flags; do + + case $flag in + none) + AC_MSG_CHECKING([whether pthreads work without any flags]) + ;; + + -*) + AC_MSG_CHECKING([whether pthreads work with $flag]) + PTHREAD_CFLAGS="$flag" + ;; + + pthread-config) + AC_CHECK_PROG([ax_pthread_config], [pthread-config], [yes], [no]) + if test x"$ax_pthread_config" = xno; then continue; fi + PTHREAD_CFLAGS="`pthread-config --cflags`" + PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`" + ;; + + *) + AC_MSG_CHECKING([for the pthreads library -l$flag]) + PTHREAD_LIBS="-l$flag" + ;; + esac + + save_LIBS="$LIBS" + save_CFLAGS="$CFLAGS" + LIBS="$PTHREAD_LIBS $LIBS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS $ax_pthread_extra_flags" + + # Check for various functions. We must include pthread.h, + # since some functions may be macros. (On the Sequent, we + # need a special flag -Kthread to make this header compile.) + # We check for pthread_join because it is in -lpthread on IRIX + # while pthread_create is in libc. We check for pthread_attr_init + # due to DEC craziness with -lpthreads. We check for + # pthread_cleanup_push because it is one of the few pthread + # functions on Solaris that doesn't have a non-functional libc stub. + # We try pthread_create on general principles. + AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h> + static void routine(void *a) { a = 0; } + static void *start_routine(void *a) { return a; }], + [pthread_t th; pthread_attr_t attr; + pthread_create(&th, 0, start_routine, 0); + pthread_join(th, 0); + pthread_attr_init(&attr); + pthread_cleanup_push(routine, 0); + pthread_cleanup_pop(0) /* ; */])], + [ax_pthread_ok=yes], + []) + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + AC_MSG_RESULT([$ax_pthread_ok]) + if test "x$ax_pthread_ok" = xyes; then + break; + fi + + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" +done +fi + +# Various other checks: +if test "x$ax_pthread_ok" = xyes; then + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Detect AIX lossage: JOINABLE attribute is called UNDETACHED. + AC_MSG_CHECKING([for joinable pthread attribute]) + attr_name=unknown + for attr in PTHREAD_CREATE_JOINABLE PTHREAD_CREATE_UNDETACHED; do + AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>], + [int attr = $attr; return attr /* ; */])], + [attr_name=$attr; break], + []) + done + AC_MSG_RESULT([$attr_name]) + if test "$attr_name" != PTHREAD_CREATE_JOINABLE; then + AC_DEFINE_UNQUOTED([PTHREAD_CREATE_JOINABLE], [$attr_name], + [Define to necessary symbol if this constant + uses a non-standard name on your system.]) + fi + + AC_MSG_CHECKING([if more special flags are required for pthreads]) + flag=no + case ${host_os} in + aix* | freebsd* | darwin*) flag="-D_THREAD_SAFE";; + osf* | hpux*) flag="-D_REENTRANT";; + solaris*) + if test "$GCC" = "yes"; then + flag="-D_REENTRANT" + else + # TODO: What about Clang on Solaris? + flag="-mt -D_REENTRANT" + fi + ;; + esac + AC_MSG_RESULT([$flag]) + if test "x$flag" != xno; then + PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS" + fi + + AC_CACHE_CHECK([for PTHREAD_PRIO_INHERIT], + [ax_cv_PTHREAD_PRIO_INHERIT], [ + AC_LINK_IFELSE([AC_LANG_PROGRAM([[#include <pthread.h>]], + [[int i = PTHREAD_PRIO_INHERIT;]])], + [ax_cv_PTHREAD_PRIO_INHERIT=yes], + [ax_cv_PTHREAD_PRIO_INHERIT=no]) + ]) + AS_IF([test "x$ax_cv_PTHREAD_PRIO_INHERIT" = "xyes"], + [AC_DEFINE([HAVE_PTHREAD_PRIO_INHERIT], [1], [Have PTHREAD_PRIO_INHERIT.])]) + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + # More AIX lossage: compile with *_r variant + if test "x$GCC" != xyes; then + case $host_os in + aix*) + AS_CASE(["x/$CC"], + [x*/c89|x*/c89_128|x*/c99|x*/c99_128|x*/cc|x*/cc128|x*/xlc|x*/xlc_v6|x*/xlc128|x*/xlc128_v6], + [#handle absolute path differently from PATH based program lookup + AS_CASE(["x$CC"], + [x/*], + [AS_IF([AS_EXECUTABLE_P([${CC}_r])],[PTHREAD_CC="${CC}_r"])], + [AC_CHECK_PROGS([PTHREAD_CC],[${CC}_r],[$CC])])]) + ;; + esac + fi +fi + +test -n "$PTHREAD_CC" || PTHREAD_CC="$CC" + +AC_SUBST([PTHREAD_LIBS]) +AC_SUBST([PTHREAD_CFLAGS]) +AC_SUBST([PTHREAD_CC]) + +# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND: +if test x"$ax_pthread_ok" = xyes; then + ifelse([$1],,[AC_DEFINE([HAVE_PTHREAD],[1],[Define if you have POSIX threads libraries and header files.])],[$1]) + : +else + ax_pthread_ok=no + $2 +fi +AC_LANG_POP +])dnl AX_PTHREAD @@ -16,6 +16,7 @@ #include <assert.h> #include <liburing.h> +#include <pthread.h> #include <stdlib.h> #include <stddef.h> @@ -26,11 +27,30 @@ typedef struct _iou_op_t _iou_op_t; typedef struct iou_ops_t iou_ops_t; +typedef struct iou_thread_t { + pthread_t pthread; + iou_t *iou; +} iou_thread_t; + typedef struct iou_t { struct io_uring ring; - unsigned n_issued, n_queued, n_submitted; + unsigned n_issued, /* SQEs allocated, but no data set */ + n_queued, /* SQEs allocated+data set */ + n_submitted, /* SQEs submitted for processing and not yet "seen" for completion */ + n_async; /* async ops created and not yet completed by iou_run() */ + _iou_op_t *processed; /* async work processed but waiting for completion */ + pthread_mutex_t processed_mutex;/* serializes processed list accesses */ unsigned quit:1; iou_ops_t *ops; + + struct { + _iou_op_t *head, *tail; + pthread_cond_t cond; + pthread_mutex_t mutex; /* serializes {head,tail} */ + } async; + + int n_threads; + iou_thread_t threads[]; } iou_t; /* private container of the public iou_op_t */ @@ -39,7 +59,15 @@ struct _iou_op_t { int (*cb)(void *cb_data); void *cb_data; - _iou_op_t *free_next; + + /* this is added for the iou_async()-submitted threaded processing, + * which does add some bloat to all ops and might warrant creating a + * distinct async op type with its own allocation/free lists... TODO + */ + int (*async_cb)(void *async_cb_data); + void *async_cb_data; + + _iou_op_t *next; iou_ops_t *container; }; @@ -84,7 +112,7 @@ static _iou_op_t * ops_get(iou_ops_t **ops) t->count = next_count; for (int i = 0; i < next_count; i++) { t->ops[i].container = t; - t->ops[i].free_next = t->free; + t->ops[i].next = t->free; t->free = &t->ops[i]; } @@ -93,8 +121,8 @@ static _iou_op_t * ops_get(iou_ops_t **ops) } _op = t->free; - t->free = _op->free_next; - _op->free_next = NULL; + t->free = _op->next; + _op->next = NULL; return _op; } @@ -105,18 +133,60 @@ static void ops_put(_iou_op_t *_op) assert(_op); assert(_op->container); - _op->free_next = _op->container->free; + _op->next = _op->container->free; _op->container->free = _op; } +/* CPU-bound async work thread, created @ iou_new(), process iou_async() callbacks */ +static void * iou_thread(iou_thread_t *thread) +{ + iou_t *iou; + + assert(thread); + assert(thread->iou); + + iou = thread->iou; + + for (;;) { + _iou_op_t *_op; + + pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, &iou->async.mutex); + pthread_mutex_lock(&iou->async.mutex); + while (!iou->async.head) + pthread_cond_wait(&iou->async.cond, &iou->async.mutex); + + _op = iou->async.head; + iou->async.head = _op->next; + if (!_op->next) + iou->async.tail = NULL; + pthread_cleanup_pop(1); + + assert(_op->async_cb(_op->async_cb_data) >= 0); /* XXX treating errors here as fatal disasters for now */ + + /* now the work is handed back to iou_run() via processed_list, which + * iou_run() polls after io_uring submissions but before waiting for io completions. + */ + pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, &iou->processed_mutex); + pthread_mutex_lock(&iou->processed_mutex); + /* FIXME: this should probably get added to a processed_tail to roughly preserve the completion order */ + _op->next = iou->processed; + iou->processed = _op; + pthread_cleanup_pop(1); + } + + pthread_exit(NULL); +} + + iou_t * iou_new(unsigned entries) { + int n_threads = 2; /* TODO: size according to of cores probed @ runtime */ iou_t *iou; assert(entries); - iou = calloc(1, sizeof(*iou)); + iou = calloc(1, sizeof(*iou) + sizeof(iou->threads[0]) * n_threads); if (!iou) return NULL; @@ -125,6 +195,16 @@ iou_t * iou_new(unsigned entries) return NULL; } + /* TODO: handle failures */ + pthread_mutex_init(&iou->processed_mutex, NULL); + pthread_cond_init(&iou->async.cond, NULL); + pthread_mutex_init(&iou->async.mutex, NULL); + for (int i = 0; i < n_threads; i++) { + iou->threads[i].iou = iou; + pthread_create(&iou->threads[i].pthread, NULL, (void *(*)(void *))iou_thread, &iou->threads[i]); + } + iou->n_threads = n_threads; + return iou; } @@ -134,6 +214,12 @@ iou_t * iou_free(iou_t *iou) if (iou) { iou_ops_t *t; + for (int i = 0; i < iou->n_threads; i++) + pthread_cancel(iou->threads[i].pthread); + + for (int i = 0; i < iou->n_threads; i++) + pthread_join(iou->threads[i].pthread, NULL); + while ((t = iou->ops)) { iou->ops = t->next; free(t); @@ -252,17 +338,42 @@ int iou_run(iou_t *iou) { assert(iou); - while (!iou->quit && (iou->n_queued + iou->n_submitted)) { + while (!iou->quit && (iou->n_queued + iou->n_submitted + iou->n_async)) { struct io_uring_cqe *cqe; _iou_op_t *_op; int r; - /* if there's more queued submissions, submit them. */ + /* flush any queued iou_ops to get those IO balls rolling. */ r = iou_flush(iou); if (r < 0) return r; - /* wait for and process a completion */ + /* complete any processed async work */ + while (iou->n_async) { + + pthread_mutex_lock(&iou->processed_mutex); + _op = iou->processed; + iou->processed = NULL; + pthread_mutex_unlock(&iou->processed_mutex); + + while (_op) { + _iou_op_t *next = _op->next; + + iou->n_async--; + + r = _op->cb(_op->cb_data); + if (r < 0) + return r; + + ops_put(_op); + _op = next; + } + } + + if (!iou->n_submitted) + continue; + + /* wait for and process an IO completion */ r = io_uring_wait_cqe(&iou->ring, &cqe); if (r < 0) return r; @@ -327,3 +438,43 @@ struct io_uring * iou_ring(iou_t *iou) return &iou->ring; } + + +/* create an async CPU-bound work unit which runs async_sb on a worker thread, eventually submitting an IORING_OP_NOP iou_op w/completion_cb+completion_cb_data when + * async_cb(async_cb_data) returns from the thread. + */ +int iou_async(iou_t *iou, int (*async_cb)(void *async_cb_data), void *async_cb_data, int (*completion_cb)(void *completion_cb_data), void *completion_cb_data) +{ + _iou_op_t *_op; + + assert(iou); + assert(async_cb); + assert(completion_cb); + + /* reuse iou_op_t to encapsulate async work, eventually it gets submitted with an sqe for the NOP but + * the sqe isn't allocated+submitted until after the CPU-bound work is completed from the iou_thread. + */ + _op = ops_get(&iou->ops); + if (!_op) + return -ENOMEM; + + _op->cb = completion_cb; + _op->cb_data = completion_cb_data; + _op->async_cb = async_cb; + _op->async_cb_data = async_cb_data; + _op->next = NULL; + + iou->n_async++; + + pthread_mutex_lock(&iou->async.mutex); + if (iou->async.tail) + iou->async.tail->next = _op; + else + iou->async.head = _op; + iou->async.tail = _op; + + pthread_cond_signal(&iou->async.cond); + pthread_mutex_unlock(&iou->async.mutex); + + return 0; +} @@ -36,5 +36,6 @@ int iou_run(iou_t *iou); int iou_quit(iou_t *iou); int iou_resize(iou_t *iou, unsigned entries); struct io_uring * iou_ring(iou_t *iou); +int iou_async(iou_t *iou, int (*async_cb)(void *async_cb_data), void *async_cb_data, int (*completion_cb)(void *completion_cb_data), void *completion_cb_data); #endif |