diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/iou.c | 171 | ||||
-rw-r--r-- | src/iou.h | 1 |
2 files changed, 162 insertions, 10 deletions
@@ -16,6 +16,7 @@ #include <assert.h> #include <liburing.h> +#include <pthread.h> #include <stdlib.h> #include <stddef.h> @@ -26,11 +27,30 @@ typedef struct _iou_op_t _iou_op_t; typedef struct iou_ops_t iou_ops_t; +typedef struct iou_thread_t { + pthread_t pthread; + iou_t *iou; +} iou_thread_t; + typedef struct iou_t { struct io_uring ring; - unsigned n_issued, n_queued, n_submitted; + unsigned n_issued, /* SQEs allocated, but no data set */ + n_queued, /* SQEs allocated+data set */ + n_submitted, /* SQEs submitted for processing and not yet "seen" for completion */ + n_async; /* async ops created and not yet completed by iou_run() */ + _iou_op_t *processed; /* async work processed but waiting for completion */ + pthread_mutex_t processed_mutex;/* serializes processed list accesses */ unsigned quit:1; iou_ops_t *ops; + + struct { + _iou_op_t *head, *tail; + pthread_cond_t cond; + pthread_mutex_t mutex; /* serializes {head,tail} */ + } async; + + int n_threads; + iou_thread_t threads[]; } iou_t; /* private container of the public iou_op_t */ @@ -39,7 +59,15 @@ struct _iou_op_t { int (*cb)(void *cb_data); void *cb_data; - _iou_op_t *free_next; + + /* this is added for the iou_async()-submitted threaded processing, + * which does add some bloat to all ops and might warrant creating a + * distinct async op type with its own allocation/free lists... TODO + */ + int (*async_cb)(void *async_cb_data); + void *async_cb_data; + + _iou_op_t *next; iou_ops_t *container; }; @@ -84,7 +112,7 @@ static _iou_op_t * ops_get(iou_ops_t **ops) t->count = next_count; for (int i = 0; i < next_count; i++) { t->ops[i].container = t; - t->ops[i].free_next = t->free; + t->ops[i].next = t->free; t->free = &t->ops[i]; } @@ -93,8 +121,8 @@ static _iou_op_t * ops_get(iou_ops_t **ops) } _op = t->free; - t->free = _op->free_next; - _op->free_next = NULL; + t->free = _op->next; + _op->next = NULL; return _op; } @@ -105,18 +133,60 @@ static void ops_put(_iou_op_t *_op) assert(_op); assert(_op->container); - _op->free_next = _op->container->free; + _op->next = _op->container->free; _op->container->free = _op; } +/* CPU-bound async work thread, created @ iou_new(), process iou_async() callbacks */ +static void * iou_thread(iou_thread_t *thread) +{ + iou_t *iou; + + assert(thread); + assert(thread->iou); + + iou = thread->iou; + + for (;;) { + _iou_op_t *_op; + + pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, &iou->async.mutex); + pthread_mutex_lock(&iou->async.mutex); + while (!iou->async.head) + pthread_cond_wait(&iou->async.cond, &iou->async.mutex); + + _op = iou->async.head; + iou->async.head = _op->next; + if (!_op->next) + iou->async.tail = NULL; + pthread_cleanup_pop(1); + + assert(_op->async_cb(_op->async_cb_data) >= 0); /* XXX treating errors here as fatal disasters for now */ + + /* now the work is handed back to iou_run() via processed_list, which + * iou_run() polls after io_uring submissions but before waiting for io completions. + */ + pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, &iou->processed_mutex); + pthread_mutex_lock(&iou->processed_mutex); + /* FIXME: this should probably get added to a processed_tail to roughly preserve the completion order */ + _op->next = iou->processed; + iou->processed = _op; + pthread_cleanup_pop(1); + } + + pthread_exit(NULL); +} + + iou_t * iou_new(unsigned entries) { + int n_threads = 2; /* TODO: size according to of cores probed @ runtime */ iou_t *iou; assert(entries); - iou = calloc(1, sizeof(*iou)); + iou = calloc(1, sizeof(*iou) + sizeof(iou->threads[0]) * n_threads); if (!iou) return NULL; @@ -125,6 +195,16 @@ iou_t * iou_new(unsigned entries) return NULL; } + /* TODO: handle failures */ + pthread_mutex_init(&iou->processed_mutex, NULL); + pthread_cond_init(&iou->async.cond, NULL); + pthread_mutex_init(&iou->async.mutex, NULL); + for (int i = 0; i < n_threads; i++) { + iou->threads[i].iou = iou; + pthread_create(&iou->threads[i].pthread, NULL, (void *(*)(void *))iou_thread, &iou->threads[i]); + } + iou->n_threads = n_threads; + return iou; } @@ -134,6 +214,12 @@ iou_t * iou_free(iou_t *iou) if (iou) { iou_ops_t *t; + for (int i = 0; i < iou->n_threads; i++) + pthread_cancel(iou->threads[i].pthread); + + for (int i = 0; i < iou->n_threads; i++) + pthread_join(iou->threads[i].pthread, NULL); + while ((t = iou->ops)) { iou->ops = t->next; free(t); @@ -252,17 +338,42 @@ int iou_run(iou_t *iou) { assert(iou); - while (!iou->quit && (iou->n_queued + iou->n_submitted)) { + while (!iou->quit && (iou->n_queued + iou->n_submitted + iou->n_async)) { struct io_uring_cqe *cqe; _iou_op_t *_op; int r; - /* if there's more queued submissions, submit them. */ + /* flush any queued iou_ops to get those IO balls rolling. */ r = iou_flush(iou); if (r < 0) return r; - /* wait for and process a completion */ + /* complete any processed async work */ + while (iou->n_async) { + + pthread_mutex_lock(&iou->processed_mutex); + _op = iou->processed; + iou->processed = NULL; + pthread_mutex_unlock(&iou->processed_mutex); + + while (_op) { + _iou_op_t *next = _op->next; + + iou->n_async--; + + r = _op->cb(_op->cb_data); + if (r < 0) + return r; + + ops_put(_op); + _op = next; + } + } + + if (!iou->n_submitted) + continue; + + /* wait for and process an IO completion */ r = io_uring_wait_cqe(&iou->ring, &cqe); if (r < 0) return r; @@ -327,3 +438,43 @@ struct io_uring * iou_ring(iou_t *iou) return &iou->ring; } + + +/* create an async CPU-bound work unit which runs async_sb on a worker thread, eventually submitting an IORING_OP_NOP iou_op w/completion_cb+completion_cb_data when + * async_cb(async_cb_data) returns from the thread. + */ +int iou_async(iou_t *iou, int (*async_cb)(void *async_cb_data), void *async_cb_data, int (*completion_cb)(void *completion_cb_data), void *completion_cb_data) +{ + _iou_op_t *_op; + + assert(iou); + assert(async_cb); + assert(completion_cb); + + /* reuse iou_op_t to encapsulate async work, eventually it gets submitted with an sqe for the NOP but + * the sqe isn't allocated+submitted until after the CPU-bound work is completed from the iou_thread. + */ + _op = ops_get(&iou->ops); + if (!_op) + return -ENOMEM; + + _op->cb = completion_cb; + _op->cb_data = completion_cb_data; + _op->async_cb = async_cb; + _op->async_cb_data = async_cb_data; + _op->next = NULL; + + iou->n_async++; + + pthread_mutex_lock(&iou->async.mutex); + if (iou->async.tail) + iou->async.tail->next = _op; + else + iou->async.head = _op; + iou->async.tail = _op; + + pthread_cond_signal(&iou->async.cond); + pthread_mutex_unlock(&iou->async.mutex); + + return 0; +} @@ -36,5 +36,6 @@ int iou_run(iou_t *iou); int iou_quit(iou_t *iou); int iou_resize(iou_t *iou, unsigned entries); struct io_uring * iou_ring(iou_t *iou); +int iou_async(iou_t *iou, int (*async_cb)(void *async_cb_data), void *async_cb_data, int (*completion_cb)(void *completion_cb_data), void *completion_cb_data); #endif |