summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--configure.ac6
-rw-r--r--m4/ax_pthread.m4332
-rw-r--r--src/iou.c171
-rw-r--r--src/iou.h1
4 files changed, 500 insertions, 10 deletions
diff --git a/configure.ac b/configure.ac
index cfe6dca..db389ef 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,5 +1,6 @@
AC_INIT([libiou], [0.1], [vcaputo@pengaru.com])
AM_INIT_AUTOMAKE([-Wall -Werror foreign])
+AC_CONFIG_MACRO_DIRS([m4])
AC_PROG_CC
AM_PROG_CC_C_O
AM_PROG_AR
@@ -8,6 +9,11 @@ AM_SILENT_RULES([yes])
CFLAGS="$CFLAGS -Wall"
+AX_PTHREAD
+LIBS="$PTHREAD_LIBS $LIBS"
+CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+CC="$PTHREAD_CC"
+
dnl Check for liburing
PKG_CHECK_MODULES(URING, liburing)
CFLAGS="$CFLAGS $URING_CFLAGS"
diff --git a/m4/ax_pthread.m4 b/m4/ax_pthread.m4
new file mode 100644
index 0000000..d383ad5
--- /dev/null
+++ b/m4/ax_pthread.m4
@@ -0,0 +1,332 @@
+# ===========================================================================
+# http://www.gnu.org/software/autoconf-archive/ax_pthread.html
+# ===========================================================================
+#
+# SYNOPSIS
+#
+# AX_PTHREAD([ACTION-IF-FOUND[, ACTION-IF-NOT-FOUND]])
+#
+# DESCRIPTION
+#
+# This macro figures out how to build C programs using POSIX threads. It
+# sets the PTHREAD_LIBS output variable to the threads library and linker
+# flags, and the PTHREAD_CFLAGS output variable to any special C compiler
+# flags that are needed. (The user can also force certain compiler
+# flags/libs to be tested by setting these environment variables.)
+#
+# Also sets PTHREAD_CC to any special C compiler that is needed for
+# multi-threaded programs (defaults to the value of CC otherwise). (This
+# is necessary on AIX to use the special cc_r compiler alias.)
+#
+# NOTE: You are assumed to not only compile your program with these flags,
+# but also link it with them as well. e.g. you should link with
+# $PTHREAD_CC $CFLAGS $PTHREAD_CFLAGS $LDFLAGS ... $PTHREAD_LIBS $LIBS
+#
+# If you are only building threads programs, you may wish to use these
+# variables in your default LIBS, CFLAGS, and CC:
+#
+# LIBS="$PTHREAD_LIBS $LIBS"
+# CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+# CC="$PTHREAD_CC"
+#
+# In addition, if the PTHREAD_CREATE_JOINABLE thread-attribute constant
+# has a nonstandard name, defines PTHREAD_CREATE_JOINABLE to that name
+# (e.g. PTHREAD_CREATE_UNDETACHED on AIX).
+#
+# Also HAVE_PTHREAD_PRIO_INHERIT is defined if pthread is found and the
+# PTHREAD_PRIO_INHERIT symbol is defined when compiling with
+# PTHREAD_CFLAGS.
+#
+# ACTION-IF-FOUND is a list of shell commands to run if a threads library
+# is found, and ACTION-IF-NOT-FOUND is a list of commands to run it if it
+# is not found. If ACTION-IF-FOUND is not specified, the default action
+# will define HAVE_PTHREAD.
+#
+# Please let the authors know if this macro fails on any platform, or if
+# you have any other suggestions or comments. This macro was based on work
+# by SGJ on autoconf scripts for FFTW (http://www.fftw.org/) (with help
+# from M. Frigo), as well as ac_pthread and hb_pthread macros posted by
+# Alejandro Forero Cuervo to the autoconf macro repository. We are also
+# grateful for the helpful feedback of numerous users.
+#
+# Updated for Autoconf 2.68 by Daniel Richard G.
+#
+# LICENSE
+#
+# Copyright (c) 2008 Steven G. Johnson <stevenj@alum.mit.edu>
+# Copyright (c) 2011 Daniel Richard G. <skunk@iSKUNK.ORG>
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the respective Autoconf Macro's copyright owner
+# gives unlimited permission to copy, distribute and modify the configure
+# scripts that are the output of Autoconf when processing the Macro. You
+# need not follow the terms of the GNU General Public License when using
+# or distributing such scripts, even though portions of the text of the
+# Macro appear in them. The GNU General Public License (GPL) does govern
+# all other use of the material that constitutes the Autoconf Macro.
+#
+# This special exception to the GPL applies to versions of the Autoconf
+# Macro released by the Autoconf Archive. When you make and distribute a
+# modified version of the Autoconf Macro, you may extend this special
+# exception to the GPL to apply to your modified version as well.
+
+#serial 21
+
+AU_ALIAS([ACX_PTHREAD], [AX_PTHREAD])
+AC_DEFUN([AX_PTHREAD], [
+AC_REQUIRE([AC_CANONICAL_HOST])
+AC_LANG_PUSH([C])
+ax_pthread_ok=no
+
+# We used to check for pthread.h first, but this fails if pthread.h
+# requires special compiler flags (e.g. on True64 or Sequent).
+# It gets checked for in the link test anyway.
+
+# First of all, check if the user has set any of the PTHREAD_LIBS,
+# etcetera environment variables, and if threads linking works using
+# them:
+if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then
+ save_CFLAGS="$CFLAGS"
+ CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+ save_LIBS="$LIBS"
+ LIBS="$PTHREAD_LIBS $LIBS"
+ AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS])
+ AC_TRY_LINK_FUNC([pthread_join], [ax_pthread_ok=yes])
+ AC_MSG_RESULT([$ax_pthread_ok])
+ if test x"$ax_pthread_ok" = xno; then
+ PTHREAD_LIBS=""
+ PTHREAD_CFLAGS=""
+ fi
+ LIBS="$save_LIBS"
+ CFLAGS="$save_CFLAGS"
+fi
+
+# We must check for the threads library under a number of different
+# names; the ordering is very important because some systems
+# (e.g. DEC) have both -lpthread and -lpthreads, where one of the
+# libraries is broken (non-POSIX).
+
+# Create a list of thread flags to try. Items starting with a "-" are
+# C compiler flags, and other items are library names, except for "none"
+# which indicates that we try without any flags at all, and "pthread-config"
+# which is a program returning the flags for the Pth emulation library.
+
+ax_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config"
+
+# The ordering *is* (sometimes) important. Some notes on the
+# individual items follow:
+
+# pthreads: AIX (must check this before -lpthread)
+# none: in case threads are in libc; should be tried before -Kthread and
+# other compiler flags to prevent continual compiler warnings
+# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h)
+# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able)
+# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread)
+# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads)
+# -pthreads: Solaris/gcc
+# -mthreads: Mingw32/gcc, Lynx/gcc
+# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it
+# doesn't hurt to check since this sometimes defines pthreads too;
+# also defines -D_REENTRANT)
+# ... -mt is also the pthreads flag for HP/aCC
+# pthread: Linux, etcetera
+# --thread-safe: KAI C++
+# pthread-config: use pthread-config program (for GNU Pth library)
+
+case ${host_os} in
+ solaris*)
+
+ # On Solaris (at least, for some versions), libc contains stubbed
+ # (non-functional) versions of the pthreads routines, so link-based
+ # tests will erroneously succeed. (We need to link with -pthreads/-mt/
+ # -lpthread.) (The stubs are missing pthread_cleanup_push, or rather
+ # a function called by this macro, so we could check for that, but
+ # who knows whether they'll stub that too in a future libc.) So,
+ # we'll just look for -pthreads and -lpthread first:
+
+ ax_pthread_flags="-pthreads pthread -mt -pthread $ax_pthread_flags"
+ ;;
+
+ darwin*)
+ ax_pthread_flags="-pthread $ax_pthread_flags"
+ ;;
+esac
+
+# Clang doesn't consider unrecognized options an error unless we specify
+# -Werror. We throw in some extra Clang-specific options to ensure that
+# this doesn't happen for GCC, which also accepts -Werror.
+
+AC_MSG_CHECKING([if compiler needs -Werror to reject unknown flags])
+save_CFLAGS="$CFLAGS"
+ax_pthread_extra_flags="-Werror"
+CFLAGS="$CFLAGS $ax_pthread_extra_flags -Wunknown-warning-option -Wsizeof-array-argument"
+AC_COMPILE_IFELSE([AC_LANG_PROGRAM([int foo(void);],[foo()])],
+ [AC_MSG_RESULT([yes])],
+ [ax_pthread_extra_flags=
+ AC_MSG_RESULT([no])])
+CFLAGS="$save_CFLAGS"
+
+if test x"$ax_pthread_ok" = xno; then
+for flag in $ax_pthread_flags; do
+
+ case $flag in
+ none)
+ AC_MSG_CHECKING([whether pthreads work without any flags])
+ ;;
+
+ -*)
+ AC_MSG_CHECKING([whether pthreads work with $flag])
+ PTHREAD_CFLAGS="$flag"
+ ;;
+
+ pthread-config)
+ AC_CHECK_PROG([ax_pthread_config], [pthread-config], [yes], [no])
+ if test x"$ax_pthread_config" = xno; then continue; fi
+ PTHREAD_CFLAGS="`pthread-config --cflags`"
+ PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`"
+ ;;
+
+ *)
+ AC_MSG_CHECKING([for the pthreads library -l$flag])
+ PTHREAD_LIBS="-l$flag"
+ ;;
+ esac
+
+ save_LIBS="$LIBS"
+ save_CFLAGS="$CFLAGS"
+ LIBS="$PTHREAD_LIBS $LIBS"
+ CFLAGS="$CFLAGS $PTHREAD_CFLAGS $ax_pthread_extra_flags"
+
+ # Check for various functions. We must include pthread.h,
+ # since some functions may be macros. (On the Sequent, we
+ # need a special flag -Kthread to make this header compile.)
+ # We check for pthread_join because it is in -lpthread on IRIX
+ # while pthread_create is in libc. We check for pthread_attr_init
+ # due to DEC craziness with -lpthreads. We check for
+ # pthread_cleanup_push because it is one of the few pthread
+ # functions on Solaris that doesn't have a non-functional libc stub.
+ # We try pthread_create on general principles.
+ AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>
+ static void routine(void *a) { a = 0; }
+ static void *start_routine(void *a) { return a; }],
+ [pthread_t th; pthread_attr_t attr;
+ pthread_create(&th, 0, start_routine, 0);
+ pthread_join(th, 0);
+ pthread_attr_init(&attr);
+ pthread_cleanup_push(routine, 0);
+ pthread_cleanup_pop(0) /* ; */])],
+ [ax_pthread_ok=yes],
+ [])
+
+ LIBS="$save_LIBS"
+ CFLAGS="$save_CFLAGS"
+
+ AC_MSG_RESULT([$ax_pthread_ok])
+ if test "x$ax_pthread_ok" = xyes; then
+ break;
+ fi
+
+ PTHREAD_LIBS=""
+ PTHREAD_CFLAGS=""
+done
+fi
+
+# Various other checks:
+if test "x$ax_pthread_ok" = xyes; then
+ save_LIBS="$LIBS"
+ LIBS="$PTHREAD_LIBS $LIBS"
+ save_CFLAGS="$CFLAGS"
+ CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+ # Detect AIX lossage: JOINABLE attribute is called UNDETACHED.
+ AC_MSG_CHECKING([for joinable pthread attribute])
+ attr_name=unknown
+ for attr in PTHREAD_CREATE_JOINABLE PTHREAD_CREATE_UNDETACHED; do
+ AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>],
+ [int attr = $attr; return attr /* ; */])],
+ [attr_name=$attr; break],
+ [])
+ done
+ AC_MSG_RESULT([$attr_name])
+ if test "$attr_name" != PTHREAD_CREATE_JOINABLE; then
+ AC_DEFINE_UNQUOTED([PTHREAD_CREATE_JOINABLE], [$attr_name],
+ [Define to necessary symbol if this constant
+ uses a non-standard name on your system.])
+ fi
+
+ AC_MSG_CHECKING([if more special flags are required for pthreads])
+ flag=no
+ case ${host_os} in
+ aix* | freebsd* | darwin*) flag="-D_THREAD_SAFE";;
+ osf* | hpux*) flag="-D_REENTRANT";;
+ solaris*)
+ if test "$GCC" = "yes"; then
+ flag="-D_REENTRANT"
+ else
+ # TODO: What about Clang on Solaris?
+ flag="-mt -D_REENTRANT"
+ fi
+ ;;
+ esac
+ AC_MSG_RESULT([$flag])
+ if test "x$flag" != xno; then
+ PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS"
+ fi
+
+ AC_CACHE_CHECK([for PTHREAD_PRIO_INHERIT],
+ [ax_cv_PTHREAD_PRIO_INHERIT], [
+ AC_LINK_IFELSE([AC_LANG_PROGRAM([[#include <pthread.h>]],
+ [[int i = PTHREAD_PRIO_INHERIT;]])],
+ [ax_cv_PTHREAD_PRIO_INHERIT=yes],
+ [ax_cv_PTHREAD_PRIO_INHERIT=no])
+ ])
+ AS_IF([test "x$ax_cv_PTHREAD_PRIO_INHERIT" = "xyes"],
+ [AC_DEFINE([HAVE_PTHREAD_PRIO_INHERIT], [1], [Have PTHREAD_PRIO_INHERIT.])])
+
+ LIBS="$save_LIBS"
+ CFLAGS="$save_CFLAGS"
+
+ # More AIX lossage: compile with *_r variant
+ if test "x$GCC" != xyes; then
+ case $host_os in
+ aix*)
+ AS_CASE(["x/$CC"],
+ [x*/c89|x*/c89_128|x*/c99|x*/c99_128|x*/cc|x*/cc128|x*/xlc|x*/xlc_v6|x*/xlc128|x*/xlc128_v6],
+ [#handle absolute path differently from PATH based program lookup
+ AS_CASE(["x$CC"],
+ [x/*],
+ [AS_IF([AS_EXECUTABLE_P([${CC}_r])],[PTHREAD_CC="${CC}_r"])],
+ [AC_CHECK_PROGS([PTHREAD_CC],[${CC}_r],[$CC])])])
+ ;;
+ esac
+ fi
+fi
+
+test -n "$PTHREAD_CC" || PTHREAD_CC="$CC"
+
+AC_SUBST([PTHREAD_LIBS])
+AC_SUBST([PTHREAD_CFLAGS])
+AC_SUBST([PTHREAD_CC])
+
+# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND:
+if test x"$ax_pthread_ok" = xyes; then
+ ifelse([$1],,[AC_DEFINE([HAVE_PTHREAD],[1],[Define if you have POSIX threads libraries and header files.])],[$1])
+ :
+else
+ ax_pthread_ok=no
+ $2
+fi
+AC_LANG_POP
+])dnl AX_PTHREAD
diff --git a/src/iou.c b/src/iou.c
index a654a66..1ce9af1 100644
--- a/src/iou.c
+++ b/src/iou.c
@@ -16,6 +16,7 @@
#include <assert.h>
#include <liburing.h>
+#include <pthread.h>
#include <stdlib.h>
#include <stddef.h>
@@ -26,11 +27,30 @@
typedef struct _iou_op_t _iou_op_t;
typedef struct iou_ops_t iou_ops_t;
+typedef struct iou_thread_t {
+ pthread_t pthread;
+ iou_t *iou;
+} iou_thread_t;
+
typedef struct iou_t {
struct io_uring ring;
- unsigned n_issued, n_queued, n_submitted;
+ unsigned n_issued, /* SQEs allocated, but no data set */
+ n_queued, /* SQEs allocated+data set */
+ n_submitted, /* SQEs submitted for processing and not yet "seen" for completion */
+ n_async; /* async ops created and not yet completed by iou_run() */
+ _iou_op_t *processed; /* async work processed but waiting for completion */
+ pthread_mutex_t processed_mutex;/* serializes processed list accesses */
unsigned quit:1;
iou_ops_t *ops;
+
+ struct {
+ _iou_op_t *head, *tail;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex; /* serializes {head,tail} */
+ } async;
+
+ int n_threads;
+ iou_thread_t threads[];
} iou_t;
/* private container of the public iou_op_t */
@@ -39,7 +59,15 @@ struct _iou_op_t {
int (*cb)(void *cb_data);
void *cb_data;
- _iou_op_t *free_next;
+
+ /* this is added for the iou_async()-submitted threaded processing,
+ * which does add some bloat to all ops and might warrant creating a
+ * distinct async op type with its own allocation/free lists... TODO
+ */
+ int (*async_cb)(void *async_cb_data);
+ void *async_cb_data;
+
+ _iou_op_t *next;
iou_ops_t *container;
};
@@ -84,7 +112,7 @@ static _iou_op_t * ops_get(iou_ops_t **ops)
t->count = next_count;
for (int i = 0; i < next_count; i++) {
t->ops[i].container = t;
- t->ops[i].free_next = t->free;
+ t->ops[i].next = t->free;
t->free = &t->ops[i];
}
@@ -93,8 +121,8 @@ static _iou_op_t * ops_get(iou_ops_t **ops)
}
_op = t->free;
- t->free = _op->free_next;
- _op->free_next = NULL;
+ t->free = _op->next;
+ _op->next = NULL;
return _op;
}
@@ -105,18 +133,60 @@ static void ops_put(_iou_op_t *_op)
assert(_op);
assert(_op->container);
- _op->free_next = _op->container->free;
+ _op->next = _op->container->free;
_op->container->free = _op;
}
+/* CPU-bound async work thread, created @ iou_new(), process iou_async() callbacks */
+static void * iou_thread(iou_thread_t *thread)
+{
+ iou_t *iou;
+
+ assert(thread);
+ assert(thread->iou);
+
+ iou = thread->iou;
+
+ for (;;) {
+ _iou_op_t *_op;
+
+ pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, &iou->async.mutex);
+ pthread_mutex_lock(&iou->async.mutex);
+ while (!iou->async.head)
+ pthread_cond_wait(&iou->async.cond, &iou->async.mutex);
+
+ _op = iou->async.head;
+ iou->async.head = _op->next;
+ if (!_op->next)
+ iou->async.tail = NULL;
+ pthread_cleanup_pop(1);
+
+ assert(_op->async_cb(_op->async_cb_data) >= 0); /* XXX treating errors here as fatal disasters for now */
+
+ /* now the work is handed back to iou_run() via processed_list, which
+ * iou_run() polls after io_uring submissions but before waiting for io completions.
+ */
+ pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, &iou->processed_mutex);
+ pthread_mutex_lock(&iou->processed_mutex);
+ /* FIXME: this should probably get added to a processed_tail to roughly preserve the completion order */
+ _op->next = iou->processed;
+ iou->processed = _op;
+ pthread_cleanup_pop(1);
+ }
+
+ pthread_exit(NULL);
+}
+
+
iou_t * iou_new(unsigned entries)
{
+ int n_threads = 2; /* TODO: size according to of cores probed @ runtime */
iou_t *iou;
assert(entries);
- iou = calloc(1, sizeof(*iou));
+ iou = calloc(1, sizeof(*iou) + sizeof(iou->threads[0]) * n_threads);
if (!iou)
return NULL;
@@ -125,6 +195,16 @@ iou_t * iou_new(unsigned entries)
return NULL;
}
+ /* TODO: handle failures */
+ pthread_mutex_init(&iou->processed_mutex, NULL);
+ pthread_cond_init(&iou->async.cond, NULL);
+ pthread_mutex_init(&iou->async.mutex, NULL);
+ for (int i = 0; i < n_threads; i++) {
+ iou->threads[i].iou = iou;
+ pthread_create(&iou->threads[i].pthread, NULL, (void *(*)(void *))iou_thread, &iou->threads[i]);
+ }
+ iou->n_threads = n_threads;
+
return iou;
}
@@ -134,6 +214,12 @@ iou_t * iou_free(iou_t *iou)
if (iou) {
iou_ops_t *t;
+ for (int i = 0; i < iou->n_threads; i++)
+ pthread_cancel(iou->threads[i].pthread);
+
+ for (int i = 0; i < iou->n_threads; i++)
+ pthread_join(iou->threads[i].pthread, NULL);
+
while ((t = iou->ops)) {
iou->ops = t->next;
free(t);
@@ -252,17 +338,42 @@ int iou_run(iou_t *iou)
{
assert(iou);
- while (!iou->quit && (iou->n_queued + iou->n_submitted)) {
+ while (!iou->quit && (iou->n_queued + iou->n_submitted + iou->n_async)) {
struct io_uring_cqe *cqe;
_iou_op_t *_op;
int r;
- /* if there's more queued submissions, submit them. */
+ /* flush any queued iou_ops to get those IO balls rolling. */
r = iou_flush(iou);
if (r < 0)
return r;
- /* wait for and process a completion */
+ /* complete any processed async work */
+ while (iou->n_async) {
+
+ pthread_mutex_lock(&iou->processed_mutex);
+ _op = iou->processed;
+ iou->processed = NULL;
+ pthread_mutex_unlock(&iou->processed_mutex);
+
+ while (_op) {
+ _iou_op_t *next = _op->next;
+
+ iou->n_async--;
+
+ r = _op->cb(_op->cb_data);
+ if (r < 0)
+ return r;
+
+ ops_put(_op);
+ _op = next;
+ }
+ }
+
+ if (!iou->n_submitted)
+ continue;
+
+ /* wait for and process an IO completion */
r = io_uring_wait_cqe(&iou->ring, &cqe);
if (r < 0)
return r;
@@ -327,3 +438,43 @@ struct io_uring * iou_ring(iou_t *iou)
return &iou->ring;
}
+
+
+/* create an async CPU-bound work unit which runs async_sb on a worker thread, eventually submitting an IORING_OP_NOP iou_op w/completion_cb+completion_cb_data when
+ * async_cb(async_cb_data) returns from the thread.
+ */
+int iou_async(iou_t *iou, int (*async_cb)(void *async_cb_data), void *async_cb_data, int (*completion_cb)(void *completion_cb_data), void *completion_cb_data)
+{
+ _iou_op_t *_op;
+
+ assert(iou);
+ assert(async_cb);
+ assert(completion_cb);
+
+ /* reuse iou_op_t to encapsulate async work, eventually it gets submitted with an sqe for the NOP but
+ * the sqe isn't allocated+submitted until after the CPU-bound work is completed from the iou_thread.
+ */
+ _op = ops_get(&iou->ops);
+ if (!_op)
+ return -ENOMEM;
+
+ _op->cb = completion_cb;
+ _op->cb_data = completion_cb_data;
+ _op->async_cb = async_cb;
+ _op->async_cb_data = async_cb_data;
+ _op->next = NULL;
+
+ iou->n_async++;
+
+ pthread_mutex_lock(&iou->async.mutex);
+ if (iou->async.tail)
+ iou->async.tail->next = _op;
+ else
+ iou->async.head = _op;
+ iou->async.tail = _op;
+
+ pthread_cond_signal(&iou->async.cond);
+ pthread_mutex_unlock(&iou->async.mutex);
+
+ return 0;
+}
diff --git a/src/iou.h b/src/iou.h
index 6609e81..77ddf97 100644
--- a/src/iou.h
+++ b/src/iou.h
@@ -36,5 +36,6 @@ int iou_run(iou_t *iou);
int iou_quit(iou_t *iou);
int iou_resize(iou_t *iou, unsigned entries);
struct io_uring * iou_ring(iou_t *iou);
+int iou_async(iou_t *iou, int (*async_cb)(void *async_cb_data), void *async_cb_data, int (*completion_cb)(void *completion_cb_data), void *completion_cb_data);
#endif
© All Rights Reserved