summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/iou.c171
-rw-r--r--src/iou.h1
2 files changed, 162 insertions, 10 deletions
diff --git a/src/iou.c b/src/iou.c
index a654a66..1ce9af1 100644
--- a/src/iou.c
+++ b/src/iou.c
@@ -16,6 +16,7 @@
#include <assert.h>
#include <liburing.h>
+#include <pthread.h>
#include <stdlib.h>
#include <stddef.h>
@@ -26,11 +27,30 @@
typedef struct _iou_op_t _iou_op_t;
typedef struct iou_ops_t iou_ops_t;
+typedef struct iou_thread_t {
+ pthread_t pthread;
+ iou_t *iou;
+} iou_thread_t;
+
typedef struct iou_t {
struct io_uring ring;
- unsigned n_issued, n_queued, n_submitted;
+ unsigned n_issued, /* SQEs allocated, but no data set */
+ n_queued, /* SQEs allocated+data set */
+ n_submitted, /* SQEs submitted for processing and not yet "seen" for completion */
+ n_async; /* async ops created and not yet completed by iou_run() */
+ _iou_op_t *processed; /* async work processed but waiting for completion */
+ pthread_mutex_t processed_mutex;/* serializes processed list accesses */
unsigned quit:1;
iou_ops_t *ops;
+
+ struct {
+ _iou_op_t *head, *tail;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex; /* serializes {head,tail} */
+ } async;
+
+ int n_threads;
+ iou_thread_t threads[];
} iou_t;
/* private container of the public iou_op_t */
@@ -39,7 +59,15 @@ struct _iou_op_t {
int (*cb)(void *cb_data);
void *cb_data;
- _iou_op_t *free_next;
+
+ /* this is added for the iou_async()-submitted threaded processing,
+ * which does add some bloat to all ops and might warrant creating a
+ * distinct async op type with its own allocation/free lists... TODO
+ */
+ int (*async_cb)(void *async_cb_data);
+ void *async_cb_data;
+
+ _iou_op_t *next;
iou_ops_t *container;
};
@@ -84,7 +112,7 @@ static _iou_op_t * ops_get(iou_ops_t **ops)
t->count = next_count;
for (int i = 0; i < next_count; i++) {
t->ops[i].container = t;
- t->ops[i].free_next = t->free;
+ t->ops[i].next = t->free;
t->free = &t->ops[i];
}
@@ -93,8 +121,8 @@ static _iou_op_t * ops_get(iou_ops_t **ops)
}
_op = t->free;
- t->free = _op->free_next;
- _op->free_next = NULL;
+ t->free = _op->next;
+ _op->next = NULL;
return _op;
}
@@ -105,18 +133,60 @@ static void ops_put(_iou_op_t *_op)
assert(_op);
assert(_op->container);
- _op->free_next = _op->container->free;
+ _op->next = _op->container->free;
_op->container->free = _op;
}
+/* CPU-bound async work thread, created @ iou_new(), process iou_async() callbacks */
+static void * iou_thread(iou_thread_t *thread)
+{
+ iou_t *iou;
+
+ assert(thread);
+ assert(thread->iou);
+
+ iou = thread->iou;
+
+ for (;;) {
+ _iou_op_t *_op;
+
+ pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, &iou->async.mutex);
+ pthread_mutex_lock(&iou->async.mutex);
+ while (!iou->async.head)
+ pthread_cond_wait(&iou->async.cond, &iou->async.mutex);
+
+ _op = iou->async.head;
+ iou->async.head = _op->next;
+ if (!_op->next)
+ iou->async.tail = NULL;
+ pthread_cleanup_pop(1);
+
+ assert(_op->async_cb(_op->async_cb_data) >= 0); /* XXX treating errors here as fatal disasters for now */
+
+ /* now the work is handed back to iou_run() via processed_list, which
+ * iou_run() polls after io_uring submissions but before waiting for io completions.
+ */
+ pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, &iou->processed_mutex);
+ pthread_mutex_lock(&iou->processed_mutex);
+ /* FIXME: this should probably get added to a processed_tail to roughly preserve the completion order */
+ _op->next = iou->processed;
+ iou->processed = _op;
+ pthread_cleanup_pop(1);
+ }
+
+ pthread_exit(NULL);
+}
+
+
iou_t * iou_new(unsigned entries)
{
+ int n_threads = 2; /* TODO: size according to of cores probed @ runtime */
iou_t *iou;
assert(entries);
- iou = calloc(1, sizeof(*iou));
+ iou = calloc(1, sizeof(*iou) + sizeof(iou->threads[0]) * n_threads);
if (!iou)
return NULL;
@@ -125,6 +195,16 @@ iou_t * iou_new(unsigned entries)
return NULL;
}
+ /* TODO: handle failures */
+ pthread_mutex_init(&iou->processed_mutex, NULL);
+ pthread_cond_init(&iou->async.cond, NULL);
+ pthread_mutex_init(&iou->async.mutex, NULL);
+ for (int i = 0; i < n_threads; i++) {
+ iou->threads[i].iou = iou;
+ pthread_create(&iou->threads[i].pthread, NULL, (void *(*)(void *))iou_thread, &iou->threads[i]);
+ }
+ iou->n_threads = n_threads;
+
return iou;
}
@@ -134,6 +214,12 @@ iou_t * iou_free(iou_t *iou)
if (iou) {
iou_ops_t *t;
+ for (int i = 0; i < iou->n_threads; i++)
+ pthread_cancel(iou->threads[i].pthread);
+
+ for (int i = 0; i < iou->n_threads; i++)
+ pthread_join(iou->threads[i].pthread, NULL);
+
while ((t = iou->ops)) {
iou->ops = t->next;
free(t);
@@ -252,17 +338,42 @@ int iou_run(iou_t *iou)
{
assert(iou);
- while (!iou->quit && (iou->n_queued + iou->n_submitted)) {
+ while (!iou->quit && (iou->n_queued + iou->n_submitted + iou->n_async)) {
struct io_uring_cqe *cqe;
_iou_op_t *_op;
int r;
- /* if there's more queued submissions, submit them. */
+ /* flush any queued iou_ops to get those IO balls rolling. */
r = iou_flush(iou);
if (r < 0)
return r;
- /* wait for and process a completion */
+ /* complete any processed async work */
+ while (iou->n_async) {
+
+ pthread_mutex_lock(&iou->processed_mutex);
+ _op = iou->processed;
+ iou->processed = NULL;
+ pthread_mutex_unlock(&iou->processed_mutex);
+
+ while (_op) {
+ _iou_op_t *next = _op->next;
+
+ iou->n_async--;
+
+ r = _op->cb(_op->cb_data);
+ if (r < 0)
+ return r;
+
+ ops_put(_op);
+ _op = next;
+ }
+ }
+
+ if (!iou->n_submitted)
+ continue;
+
+ /* wait for and process an IO completion */
r = io_uring_wait_cqe(&iou->ring, &cqe);
if (r < 0)
return r;
@@ -327,3 +438,43 @@ struct io_uring * iou_ring(iou_t *iou)
return &iou->ring;
}
+
+
+/* create an async CPU-bound work unit which runs async_sb on a worker thread, eventually submitting an IORING_OP_NOP iou_op w/completion_cb+completion_cb_data when
+ * async_cb(async_cb_data) returns from the thread.
+ */
+int iou_async(iou_t *iou, int (*async_cb)(void *async_cb_data), void *async_cb_data, int (*completion_cb)(void *completion_cb_data), void *completion_cb_data)
+{
+ _iou_op_t *_op;
+
+ assert(iou);
+ assert(async_cb);
+ assert(completion_cb);
+
+ /* reuse iou_op_t to encapsulate async work, eventually it gets submitted with an sqe for the NOP but
+ * the sqe isn't allocated+submitted until after the CPU-bound work is completed from the iou_thread.
+ */
+ _op = ops_get(&iou->ops);
+ if (!_op)
+ return -ENOMEM;
+
+ _op->cb = completion_cb;
+ _op->cb_data = completion_cb_data;
+ _op->async_cb = async_cb;
+ _op->async_cb_data = async_cb_data;
+ _op->next = NULL;
+
+ iou->n_async++;
+
+ pthread_mutex_lock(&iou->async.mutex);
+ if (iou->async.tail)
+ iou->async.tail->next = _op;
+ else
+ iou->async.head = _op;
+ iou->async.tail = _op;
+
+ pthread_cond_signal(&iou->async.cond);
+ pthread_mutex_unlock(&iou->async.mutex);
+
+ return 0;
+}
diff --git a/src/iou.h b/src/iou.h
index 6609e81..77ddf97 100644
--- a/src/iou.h
+++ b/src/iou.h
@@ -36,5 +36,6 @@ int iou_run(iou_t *iou);
int iou_quit(iou_t *iou);
int iou_resize(iou_t *iou, unsigned entries);
struct io_uring * iou_ring(iou_t *iou);
+int iou_async(iou_t *iou, int (*async_cb)(void *async_cb_data), void *async_cb_data, int (*completion_cb)(void *completion_cb_data), void *completion_cb_data);
#endif
© All Rights Reserved