summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVito Caputo <vcaputo@pengaru.com>2021-08-27 19:22:51 -0700
committerVito Caputo <vcaputo@pengaru.com>2021-08-27 21:35:05 -0700
commite7d4b13994b5255eaef0135905b017ba357370c1 (patch)
treec58985faadbea38949b6d5d8267ef86fd93c3322
parent02aa67ba96680f278158385762c7d5a7964f179f (diff)
journals: implement rudimentary read buffers
This adds eight 8KiB "fixed" buffers per opened journal, recycled in a basic LRU fashion. Any read 8KiB or smaller passes through this cache, simply memcpy()d from the buffer when already resident, or upsized to an 8KiB read when absent, to then be memcpy()d out of the populated buffer when the read into the buffer completes. Any read larger than 8KiB bypasses the buffers to be read directly into the provided destination via iou as if the cache weren't present at all.
-rw-r--r--src/journals.c280
-rw-r--r--src/journals.h29
2 files changed, 193 insertions, 116 deletions
diff --git a/src/journals.c b/src/journals.c
index f2df53e..89cb130 100644
--- a/src/journals.c
+++ b/src/journals.c
@@ -44,6 +44,121 @@ typedef struct journals_t {
} journals_t;
+/* helper for bumping buf down to the bottom of journal->lru_{head,tail},
+ * this should probably just pull in some linked list header like the kernel's
+ * list.h, open coded slop for now
+ */
+static void buf_used(journal_t *journal, journal_buf_t *buf)
+{
+ assert(journal);
+ assert(buf);
+
+ if (journal->lru_tail == buf)
+ return;
+
+ if (buf->lru_prev)
+ buf->lru_prev->lru_next = buf->lru_next;
+ else
+ journal->lru_head = buf->lru_next;
+
+ buf->lru_next->lru_prev = buf->lru_prev;
+ buf->lru_next = NULL;
+
+ journal->lru_tail->lru_next = buf;
+ buf->lru_prev = journal->lru_tail;
+ journal->lru_tail = buf;
+}
+
+
+THUNK_DEFINE_STATIC(buf_got_read, iou_t *, iou, iou_op_t *, op, void *, dest, uint64_t, offset, uint64_t, length, journal_buf_t *, buf, thunk_t *, closure)
+{
+ assert(iou);
+ assert(op);
+ assert(closure);
+ assert(dest || !buf);
+
+ if (op->result < 0)
+ return op->result;
+
+ if (op->result < length)
+ return -EINVAL;
+
+ if (buf) {
+ memcpy(dest, buf->data, length);
+ buf->length = op->result;
+ buf->offset = offset;
+ buf->valid = 1;
+ }
+
+ return thunk_end(thunk_dispatch(closure));
+}
+
+
+/* read size bytes from offset offset in journal to dest, dispatch closure when done.
+ * for reads <= JOURNAL_BUF_SIZE in size, a per-journal cache of
+ * JOURNAL_BUF_CNT*JOURNAL_BUF_SIZE buffers is maintained and if the data is
+ * present it's simply copied from there before dispatching closure.
+ */
+int journal_read(iou_t *iou, journal_t *journal, uint64_t offset, uint64_t length, void *dest, thunk_t *closure)
+{
+ journal_buf_t *buf;
+ iou_op_t *op;
+
+ assert(iou);
+ assert(journal);
+ assert(length);
+ assert(dest);
+ assert(closure);
+
+ if (length <= JOURNAL_BUF_SIZE) {
+ /* small enough to fit, look in the buffers */
+
+ for (int i = 0; i < JOURNAL_BUF_CNT; i++) {
+ journal_buf_t *buf = &journal->bufs[i];
+
+ if (!buf->valid)
+ continue;
+
+ if (offset >= buf->offset && offset + length <= buf->offset + buf->length) {
+ buf_used(journal, buf);
+ memcpy(dest, &buf->data[offset - buf->offset], length);
+
+ thunk_dispatch(closure);
+
+ return 0;
+ }
+ }
+
+ /* buffer fits, but wasn't found, read it into the "fixed" lru buf,
+ * buf_got_read() will then copy out of the buf into dest when loaded.
+ */
+ buf = journal->lru_head;
+ buf_used(journal, buf);
+ buf->valid = 0;
+
+ op = iou_op_new(iou);
+ if (!op)
+ return -ENOMEM;
+
+ io_uring_prep_read_fixed(op->sqe, journal->idx, buf->data, JOURNAL_BUF_SIZE, offset, buf->idx);
+ op->sqe->flags = IOSQE_FIXED_FILE;
+ op_queue(iou, op, THUNK(buf_got_read(iou, op, dest, offset, length, buf, closure)));
+ } else {
+ /* buffer doesn't fit, plain unbuffered read it into provided dest (assumed to be non-fixed) */
+
+ op = iou_op_new(iou);
+ if (!op)
+ return -ENOMEM;
+
+ io_uring_prep_read(op->sqe, journal->idx, dest, length, offset);
+ op->sqe->flags = IOSQE_FIXED_FILE;
+ op_queue(iou, op, THUNK(buf_got_read(iou, op, NULL, offset, length, NULL, closure)));
+ }
+
+ return 0;
+}
+
+
/* an open on journal->name was attempted, result in op->result.
* bump *journals->n_opened, when it matches *journals->n_journals, dispatch closure
*/
@@ -67,6 +182,17 @@ THUNK_DEFINE_STATIC(opened_journal, iou_t *, iou, iou_op_t *, op, journals_t *,
fprintf(stderr, "Permission denied opening \"%s\", ignoring\n", journal->name);
journal->fd = -1;
} else {
+
+ /* setup journal->lru_{head,tail} bufs list */
+ journal->lru_head = journal->lru_tail = &journal->bufs[0];
+ for (int i = 1; i < JOURNAL_BUF_CNT; i++) {
+ journal_buf_t *buf = &journal->bufs[i];
+
+ journal->lru_tail->lru_next = buf;
+ buf->lru_prev = journal->lru_tail;
+ journal->lru_tail = buf;
+ }
+
journal->fd = op->result;
}
@@ -89,6 +215,32 @@ THUNK_DEFINE_STATIC(opened_journal, iou_t *, iou, iou_op_t *, op, journals_t *,
if (r < 0)
return r;
+ {
+ struct iovec *bufs;
+ int n_bufs = 0;
+
+ bufs = malloc(sizeof(*bufs) * JOURNAL_BUF_CNT * journals->n_journals);
+ if (!bufs)
+ return -ENOMEM;
+
+ for (int i = 0; i < journals->n_journals; i++) {
+ /* "register" buffers with io_uring for a perf boost */
+
+
+ for (int j = 0; j < JOURNAL_BUF_CNT; j++) {
+ journal_buf_t *buf = &journals->journals[i].bufs[j];
+
+ bufs[n_bufs].iov_base = buf->data;
+ bufs[n_bufs].iov_len = JOURNAL_BUF_SIZE;
+ buf->idx = n_bufs++;
+ }
+ }
+
+ r = io_uring_register_buffers(iou_ring(iou), bufs, n_bufs);
+ if (r < 0)
+ return r;
+ }
+
free(fds);
return thunk_end(thunk_dispatch(closure));
@@ -232,7 +384,7 @@ THUNK_DEFINE(journals_open, iou_t *, iou, char **, machid, int, flags, journals_
-THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, journal_t *, journal, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
{
assert(iou);
assert(journal);
@@ -240,12 +392,6 @@ THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journa
assert(iter_object_header);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result != sizeof(ObjectHeader))
- return -EINVAL;
-
iter_object_header->size = le64toh(iter_object_header->size);
return thunk_end(thunk_dispatch(closure));
@@ -273,8 +419,6 @@ THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journa
*/
THUNK_DEFINE(journal_iter_next_object, iou_t *, iou, journal_t **, journal, Header *, header, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
{
- iou_op_t *op;
-
assert(iou);
assert(journal);
assert(iter_offset);
@@ -299,15 +443,8 @@ THUNK_DEFINE(journal_iter_next_object, iou_t *, iou, journal_t **, journal, Head
return thunk_dispatch(closure);
}
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, iter_object_header, sizeof(ObjectHeader), *iter_offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_iter_object_header(iou, op, *journal, iter_offset, iter_object_header, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *iter_offset, sizeof(ObjectHeader), iter_object_header, THUNK(
+ got_iter_object_header(iou, *journal, iter_offset, iter_object_header, closure)));
}
@@ -345,7 +482,7 @@ THUNK_DEFINE(journal_iter_objects, iou_t *, iou, journal_t **, journal, Header *
}
-THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, HashItem *, hash_table, uint64_t, nbuckets, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, journal_t *, journal, HashItem *, hash_table, uint64_t, nbuckets, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
{
assert(iou);
assert(journal);
@@ -355,12 +492,6 @@ THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *,
assert(iter_object_header);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result != iter_object_size)
- return -EINVAL;
-
iter_object_header->object.size = le64toh(iter_object_header->object.size);
iter_object_header->hash = le64toh(iter_object_header->hash);
iter_object_header->next_hash_offset = le64toh(iter_object_header->next_hash_offset);
@@ -424,7 +555,6 @@ THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *,
THUNK_DEFINE(journal_hash_table_iter_next_object, iou_t *, iou, journal_t **, journal, HashItem **, hash_table, uint64_t *, hash_table_size, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
{
size_t nbuckets;
- iou_op_t *op;
assert(iou);
assert(journal);
@@ -457,15 +587,8 @@ THUNK_DEFINE(journal_hash_table_iter_next_object, iou_t *, iou, journal_t **, jo
} while (!(*iter_offset));
}
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, iter_object_header, iter_object_size, *iter_offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_hash_table_iter_object_header(iou, op, *journal, *hash_table, *hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *iter_offset, iter_object_size, iter_object_header, THUNK(
+ got_hash_table_iter_object_header(iou, *journal, *hash_table, *hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure)));
}
@@ -508,20 +631,13 @@ THUNK_DEFINE(journal_hash_table_for_each, iou_t *, iou, journal_t **, journal, H
}
-THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, iou_op_t *, op, HashItem *, table, uint64_t, size, HashItem **, res_hash_table, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, HashItem *, table, uint64_t, size, HashItem **, res_hash_table, thunk_t *, closure)
{
assert(iou);
- assert(op);
assert(table);
assert(res_hash_table);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result != size)
- return -EINVAL;
-
for (uint64_t i = 0; i < size / sizeof(HashItem); i++) {
table[i].head_hash_offset = le64toh(table[i].head_hash_offset);
table[i].tail_hash_offset = le64toh(table[i].tail_hash_offset);
@@ -539,7 +655,6 @@ THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, iou_op_t *, op, HashItem *, tab
*/
THUNK_DEFINE(journal_get_hash_table, iou_t *, iou, journal_t **, journal, uint64_t *, hash_table_offset, uint64_t *, hash_table_size, HashItem **, res_hash_table, thunk_t *, closure)
{
- iou_op_t *op;
HashItem *table;
assert(iou);
@@ -549,37 +664,23 @@ THUNK_DEFINE(journal_get_hash_table, iou_t *, iou, journal_t **, journal, uint64
assert(res_hash_table);
assert(closure);
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
table = malloc(*hash_table_size);
if (!table)
return -ENOMEM;
- io_uring_prep_read(op->sqe, (*journal)->idx, table, *hash_table_size, *hash_table_offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_hashtable(iou, op, table, *hash_table_size, res_hash_table, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *hash_table_offset, *hash_table_size, table, THUNK(
+ got_hashtable(iou, table, *hash_table_size, res_hash_table, closure)));
}
/* Validate and prepare journal header loaded via journal_get_header @ header, dispatch closure. */
-THUNK_DEFINE_STATIC(got_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, Header *, header, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_header, iou_t *, iou, journal_t *, journal, Header *, header, thunk_t *, closure)
{
assert(iou);
- assert(op);
assert(journal);
assert(header);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result < sizeof(*header))
- return -EINVAL;
-
header->compatible_flags = le32toh(header->compatible_flags);
header->incompatible_flags = le32toh(header->incompatible_flags);
header->header_size = le64toh(header->header_size);
@@ -614,38 +715,22 @@ THUNK_DEFINE_STATIC(got_header, iou_t *, iou, iou_op_t *, op, journal_t *, journ
*/
THUNK_DEFINE(journal_get_header, iou_t *, iou, journal_t **, journal, Header *, header, thunk_t *, closure)
{
- iou_op_t *op;
-
assert(iou);
assert(journal);
assert(closure);
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, header, sizeof(*header), 0);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_header(iou, op, *journal, header, closure)));
-
- return 0;
+ return journal_read(iou, *journal, 0, sizeof(*header), header, THUNK(
+ got_header(iou, *journal, header, closure)));
}
/* Validate and prepare object header loaded via journal_get_object_header @ object_header, dispatch closure. */
-THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, iou_op_t *, op, ObjectHeader *, object_header, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, ObjectHeader *, object_header, thunk_t *, closure)
{
assert(iou);
- assert(op);
assert(object_header);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result < sizeof(*object_header))
- return -EINVAL;
-
object_header->size = le64toh(object_header->size);
/* TODO: validation/sanity checks? */
@@ -658,42 +743,26 @@ THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, iou_op_t *, op, ObjectHeade
*/
THUNK_DEFINE(journal_get_object_header, iou_t *, iou, journal_t **, journal, uint64_t *, offset, ObjectHeader *, object_header, thunk_t *, closure)
{
- iou_op_t *op;
-
assert(iou);
assert(journal);
assert(offset);
assert(object_header);
assert(closure);
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, object_header, sizeof(*object_header), *offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_object_header(iou, op, object_header, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *offset, sizeof(*object_header), object_header, THUNK(
+ got_object_header(iou, object_header, closure)));
}
#define OBJECT_N_ITEMS(_o) \
((_o.object.size - offsetof(typeof(_o), items)) / sizeof(*_o.items))
/* Validate and prepare object loaded via journal_get_object @ object, dispatch closure. */
-THUNK_DEFINE_STATIC(got_object, iou_t *, iou, iou_op_t *, op, uint64_t, size, Object *, object, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_object, iou_t *, iou, uint64_t, size, Object *, object, thunk_t *, closure)
{
assert(iou);
- assert(op);
assert(object);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result != size)
- return -EINVAL;
-
object->object.size = le64toh(object->object.size);
/* TODO: validation/sanity checks? */
@@ -771,8 +840,6 @@ THUNK_DEFINE_STATIC(got_object, iou_t *, iou, iou_op_t *, op, uint64_t, size, Ob
*/
THUNK_DEFINE(journal_get_object, iou_t *, iou, journal_t **, journal, uint64_t *, offset, uint64_t *, size, Object **, object, thunk_t *, closure)
{
- iou_op_t *op;
-
assert(iou);
assert(journal);
assert(offset);
@@ -780,15 +847,8 @@ THUNK_DEFINE(journal_get_object, iou_t *, iou, journal_t **, journal, uint64_t *
assert(object && *object);
assert(closure);
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, *object, *size, *offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_object(iou, op, *size, *object, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *offset, *size, *object, THUNK(
+ got_object(iou, *size, *object, closure)));
}
diff --git a/src/journals.h b/src/journals.h
index 84c7a0f..1327a5e 100644
--- a/src/journals.h
+++ b/src/journals.h
@@ -13,14 +13,29 @@
#include "upstream/journal-def.h"
-typedef struct iou_t iou_t;
-
-typedef struct journal_t {
- char *name;
- int fd, idx;
-} journal_t;
+/* TODO: the journal buf stuff should be made private to journals.c */
+#define JOURNAL_BUF_SIZE 8192
+#define JOURNAL_BUF_CNT 8
+typedef struct iou_t iou_t;
typedef struct journals_t journals_t;
+typedef struct journal_t journal_t;
+typedef struct journal_buf_t journal_buf_t;
+
+struct journal_buf_t {
+ journal_buf_t *lru_prev, *lru_next;
+ uint64_t offset, length;
+ unsigned valid:1;
+ int idx;
+ uint8_t data[JOURNAL_BUF_SIZE];
+};
+
+struct journal_t {
+ char *name;
+ int fd, idx;
+ journal_buf_t *lru_head, *lru_tail;
+ journal_buf_t bufs[JOURNAL_BUF_CNT];
+};
THUNK_DECLARE(journals_open, iou_t *, iou, char **, machid, int, flags, journals_t **, journals, thunk_t *, closure);
THUNK_DECLARE(journal_get_header, iou_t *, iou, journal_t **, journal, Header *, header, thunk_t *, closure);
@@ -40,4 +55,6 @@ THUNK_DECLARE(journals_for_each, journals_t **, journals, journal_t **, journal_
const char * journal_object_type_str(ObjectType type);
const char * journal_state_str(JournalState state);
+int journal_read(iou_t *iou, journal_t *journal, uint64_t offset, uint64_t length, void *dest, thunk_t *closure);
+
#endif
© All Rights Reserved