summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/iou.c245
-rw-r--r--src/iou.h39
2 files changed, 284 insertions, 0 deletions
diff --git a/src/iou.c b/src/iou.c
new file mode 100644
index 0000000..6a07853
--- /dev/null
+++ b/src/iou.c
@@ -0,0 +1,245 @@
+/*
+ * Copyright (C) 2020 - Vito Caputo - <vcaputo@pengaru.com>
+ *
+ * This program is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 3 as published
+ * by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <assert.h>
+#include <liburing.h>
+#include <stdlib.h>
+#include <stddef.h>
+
+#include "iou.h"
+
+/* iou is a minimal IO-oriented async callback scheduler built atop io_uring */
+
+typedef struct iou_t {
+ struct io_uring ring;
+ unsigned n_issued, n_queued, n_submitted;
+ unsigned quit:1;
+} iou_t;
+
+/* private container of the public iou_op_t */
+typedef struct _iou_op_t {
+ iou_op_t public;
+
+ int (*cb)(void *cb_data);
+ void *cb_data;
+} _iou_op_t;
+
+
+#ifndef CONTAINER_OF
+#define CONTAINER_OF(ptr, type, member) \
+(type *)((char *)(ptr) - offsetof(type, member))
+#endif
+
+
+iou_t * iou_new(unsigned entries)
+{
+ iou_t *iou;
+
+ assert(entries);
+
+ iou = calloc(1, sizeof(*iou));
+ if (!iou)
+ return NULL;
+
+ if (io_uring_queue_init(entries, &iou->ring, 0) < 0) {
+ free(iou);
+ return NULL;
+ }
+
+ return iou;
+}
+
+
+iou_t * iou_free(iou_t *iou)
+{
+ if (iou) {
+ io_uring_queue_exit(&iou->ring);
+ free(iou);
+ }
+
+ return NULL;
+}
+
+
+/* allocates an op, which wraps an io_uring sqe, result, and associated closure */
+/* when an op completes, its result member is populated from the cqe, and its
+ * closure supplied when queued is called. If the closure must access the cqe,
+ * it should bind the op by the submission time, then it may access the
+ * op->cqe member.
+ */
+iou_op_t * iou_op_new(iou_t *iou)
+{
+ _iou_op_t *new;
+
+ assert(iou);
+
+ new = calloc(1, sizeof(*new));
+ if (!new)
+ return NULL;
+
+ new->public.sqe = io_uring_get_sqe(&iou->ring);
+ if (!new->public.sqe) {
+ free(new);
+ return NULL;
+ }
+
+ iou->n_issued++;
+
+ return &new->public;
+}
+
+
+/* finalizes op for submission, assigning cb+cb_data to op,
+ * and assigning op to op->sqe->user_data.
+ *
+ * op shouldn't be manipulated again after submission, cb_data
+ * must be valid at least until op completes and calles cb.
+ *
+ * if cb returns a negative value it's treated as a hard error ending and
+ * propagating out of iou_run().
+ */
+void iou_op_queue(iou_t *iou, iou_op_t *op, int (*cb)(void *cb_data), void *cb_data)
+{
+ _iou_op_t *_op;
+
+ assert(iou);
+ assert(op);
+ assert(cb);
+ assert(iou->n_issued);
+
+ _op = CONTAINER_OF(op, _iou_op_t, public);
+
+ _op->cb = cb;
+ _op->cb_data = cb_data;
+
+ io_uring_sqe_set_data(op->sqe, _op);
+
+ iou->n_issued--;
+ iou->n_queued++;
+}
+
+
+/* If there's more queued submissions, submit them to io_uring.
+ *
+ * XXX: This is made public under the assumption that there's utility in a
+ * synchronous flushing of the SQ. I originally added it in an attempt to pin
+ * the dirfd supplied with a batch of relative OPENAT requests, so the dirfd
+ * could be closed immediately rather than having to refcount the dependent
+ * dirfd in the caller for deferred closing until the dependent ops finish. At
+ * the time it didn't work - the OPENATs failed with EBADFD because of the
+ * immediate dirfd closing, even with the flush before the close. But
+ * discussion on the io-uring mailing list left me optimistic it'd get fixed
+ * eventually, so keeping it public. There might already be other operations
+ * where flushing is effective, I haven't dug into the kernel implementation.
+ */
+int iou_flush(iou_t *iou)
+{
+ int r;
+
+ assert(iou);
+
+ if (!iou->n_queued)
+ return 0;
+
+ /* defend against any issued but non-queued SQEs at this point,
+ * since io_uring_submit() would submit those but they're incomplete.
+ */
+ assert(!iou->n_issued);
+
+ r = io_uring_submit(&iou->ring);
+ if (r < 0)
+ return r;
+
+ assert(r <= iou->n_queued);
+
+ iou->n_queued -= r;
+ iou->n_submitted += r;
+
+ return 0;
+}
+
+
+/* run an iou scheduler until quit, or an error occurs */
+/* returns -errno on error, 0 on a graceful finish */
+/* XXX: note if no op has been queued to kickstart things, this is effectively a noop. */
+int iou_run(iou_t *iou)
+{
+ assert(iou);
+
+ while (!iou->quit && (iou->n_queued + iou->n_submitted)) {
+ struct io_uring_cqe *cqe;
+ _iou_op_t *_op;
+ int r;
+
+ /* if there's more queued submissions, submit them. */
+ r = iou_flush(iou);
+ if (r < 0)
+ return r;
+
+ /* wait for and process a completion */
+ r = io_uring_wait_cqe(&iou->ring, &cqe);
+ if (r < 0)
+ return r;
+
+ _op = io_uring_cqe_get_data(cqe);
+ _op->public.result = cqe->res;
+
+ io_uring_cqe_seen(&iou->ring, cqe);
+ iou->n_submitted--;
+
+ r = _op->cb(_op->cb_data);
+ if (r < 0)
+ return r;
+
+ free(_op);
+ }
+
+ return 0;
+}
+
+
+/* Inform the supplied iou instance to quit its event loop gracefully */
+int iou_quit(iou_t *iou)
+{
+ assert(iou);
+
+ iou->quit = 1;
+
+ return 0;
+}
+
+
+/* Resize the underlying io_uring to entries.
+ *
+ * This can only be done when there's nothing queued or submitted, since
+ * it's essentially tearing down and recreating everything including the
+ * queues containing such entries.
+ *
+ * The intended purpose of this is sometimes you start out with a safe
+ * minimum of queue depth for a bootstrap process, but in the course of
+ * bootstrap you learn necessary depths to accomodate subsequent operations
+ * and need to resize accordingly.
+ */
+int iou_resize(iou_t *iou, unsigned entries)
+{
+ assert(iou);
+ assert(entries);
+ assert(!(iou->n_issued + iou->n_queued + iou->n_submitted));
+
+ io_uring_queue_exit(&iou->ring);
+
+ return io_uring_queue_init(entries, &iou->ring, 0);
+}
diff --git a/src/iou.h b/src/iou.h
new file mode 100644
index 0000000..0c7c5c2
--- /dev/null
+++ b/src/iou.h
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2020 - Vito Caputo - <vcaputo@pengaru.com>
+ *
+ * This program is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 3 as published
+ * by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef _IOU_H
+#define _IOU_H
+
+#include <liburing.h>
+
+typedef struct iou_t iou_t;
+
+typedef struct iou_op_t {
+ struct io_uring_sqe *sqe;
+ int result;
+} iou_op_t;
+
+
+iou_t * iou_new(unsigned entries);
+iou_t * iou_free(iou_t *iou);
+iou_op_t * iou_op_new(iou_t *iou);
+void iou_op_queue(iou_t *iou, iou_op_t *op, int (*cb)(void *cb_data), void *cb_data);
+int iou_flush(iou_t *iou);
+int iou_run(iou_t *iou);
+int iou_quit(iou_t *iou);
+int iou_resize(iou_t *iou, unsigned entries);
+
+#endif
© All Rights Reserved