summaryrefslogtreecommitdiff
path: root/src/journals.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/journals.c')
-rw-r--r--src/journals.c280
1 files changed, 170 insertions, 110 deletions
diff --git a/src/journals.c b/src/journals.c
index f2df53e..89cb130 100644
--- a/src/journals.c
+++ b/src/journals.c
@@ -44,6 +44,121 @@ typedef struct journals_t {
} journals_t;
+/* helper for bumping buf down to the bottom of journal->lru_{head,tail},
+ * this should probably just pull in some linked list header like the kernel's
+ * list.h, open coded slop for now
+ */
+static void buf_used(journal_t *journal, journal_buf_t *buf)
+{
+ assert(journal);
+ assert(buf);
+
+ if (journal->lru_tail == buf)
+ return;
+
+ if (buf->lru_prev)
+ buf->lru_prev->lru_next = buf->lru_next;
+ else
+ journal->lru_head = buf->lru_next;
+
+ buf->lru_next->lru_prev = buf->lru_prev;
+ buf->lru_next = NULL;
+
+ journal->lru_tail->lru_next = buf;
+ buf->lru_prev = journal->lru_tail;
+ journal->lru_tail = buf;
+}
+
+
+THUNK_DEFINE_STATIC(buf_got_read, iou_t *, iou, iou_op_t *, op, void *, dest, uint64_t, offset, uint64_t, length, journal_buf_t *, buf, thunk_t *, closure)
+{
+ assert(iou);
+ assert(op);
+ assert(closure);
+ assert(dest || !buf);
+
+ if (op->result < 0)
+ return op->result;
+
+ if (op->result < length)
+ return -EINVAL;
+
+ if (buf) {
+ memcpy(dest, buf->data, length);
+ buf->length = op->result;
+ buf->offset = offset;
+ buf->valid = 1;
+ }
+
+ return thunk_end(thunk_dispatch(closure));
+}
+
+
+/* read size bytes from offset offset in journal to dest, dispatch closure when done.
+ * for reads <= JOURNAL_BUF_SIZE in size, a per-journal cache of
+ * JOURNAL_BUF_CNT*JOURNAL_BUF_SIZE buffers is maintained and if the data is
+ * present it's simply copied from there before dispatching closure.
+ */
+int journal_read(iou_t *iou, journal_t *journal, uint64_t offset, uint64_t length, void *dest, thunk_t *closure)
+{
+ journal_buf_t *buf;
+ iou_op_t *op;
+
+ assert(iou);
+ assert(journal);
+ assert(length);
+ assert(dest);
+ assert(closure);
+
+ if (length <= JOURNAL_BUF_SIZE) {
+ /* small enough to fit, look in the buffers */
+
+ for (int i = 0; i < JOURNAL_BUF_CNT; i++) {
+ journal_buf_t *buf = &journal->bufs[i];
+
+ if (!buf->valid)
+ continue;
+
+ if (offset >= buf->offset && offset + length <= buf->offset + buf->length) {
+ buf_used(journal, buf);
+ memcpy(dest, &buf->data[offset - buf->offset], length);
+
+ thunk_dispatch(closure);
+
+ return 0;
+ }
+ }
+
+ /* buffer fits, but wasn't found, read it into the "fixed" lru buf,
+ * buf_got_read() will then copy out of the buf into dest when loaded.
+ */
+ buf = journal->lru_head;
+ buf_used(journal, buf);
+ buf->valid = 0;
+
+ op = iou_op_new(iou);
+ if (!op)
+ return -ENOMEM;
+
+ io_uring_prep_read_fixed(op->sqe, journal->idx, buf->data, JOURNAL_BUF_SIZE, offset, buf->idx);
+ op->sqe->flags = IOSQE_FIXED_FILE;
+ op_queue(iou, op, THUNK(buf_got_read(iou, op, dest, offset, length, buf, closure)));
+ } else {
+ /* buffer doesn't fit, plain unbuffered read it into provided dest (assumed to be non-fixed) */
+
+ op = iou_op_new(iou);
+ if (!op)
+ return -ENOMEM;
+
+ io_uring_prep_read(op->sqe, journal->idx, dest, length, offset);
+ op->sqe->flags = IOSQE_FIXED_FILE;
+ op_queue(iou, op, THUNK(buf_got_read(iou, op, NULL, offset, length, NULL, closure)));
+ }
+
+ return 0;
+}
+
+
/* an open on journal->name was attempted, result in op->result.
* bump *journals->n_opened, when it matches *journals->n_journals, dispatch closure
*/
@@ -67,6 +182,17 @@ THUNK_DEFINE_STATIC(opened_journal, iou_t *, iou, iou_op_t *, op, journals_t *,
fprintf(stderr, "Permission denied opening \"%s\", ignoring\n", journal->name);
journal->fd = -1;
} else {
+
+ /* setup journal->lru_{head,tail} bufs list */
+ journal->lru_head = journal->lru_tail = &journal->bufs[0];
+ for (int i = 1; i < JOURNAL_BUF_CNT; i++) {
+ journal_buf_t *buf = &journal->bufs[i];
+
+ journal->lru_tail->lru_next = buf;
+ buf->lru_prev = journal->lru_tail;
+ journal->lru_tail = buf;
+ }
+
journal->fd = op->result;
}
@@ -89,6 +215,32 @@ THUNK_DEFINE_STATIC(opened_journal, iou_t *, iou, iou_op_t *, op, journals_t *,
if (r < 0)
return r;
+ {
+ struct iovec *bufs;
+ int n_bufs = 0;
+
+ bufs = malloc(sizeof(*bufs) * JOURNAL_BUF_CNT * journals->n_journals);
+ if (!bufs)
+ return -ENOMEM;
+
+ for (int i = 0; i < journals->n_journals; i++) {
+ /* "register" buffers with io_uring for a perf boost */
+
+
+ for (int j = 0; j < JOURNAL_BUF_CNT; j++) {
+ journal_buf_t *buf = &journals->journals[i].bufs[j];
+
+ bufs[n_bufs].iov_base = buf->data;
+ bufs[n_bufs].iov_len = JOURNAL_BUF_SIZE;
+ buf->idx = n_bufs++;
+ }
+ }
+
+ r = io_uring_register_buffers(iou_ring(iou), bufs, n_bufs);
+ if (r < 0)
+ return r;
+ }
+
free(fds);
return thunk_end(thunk_dispatch(closure));
@@ -232,7 +384,7 @@ THUNK_DEFINE(journals_open, iou_t *, iou, char **, machid, int, flags, journals_
-THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, journal_t *, journal, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
{
assert(iou);
assert(journal);
@@ -240,12 +392,6 @@ THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journa
assert(iter_object_header);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result != sizeof(ObjectHeader))
- return -EINVAL;
-
iter_object_header->size = le64toh(iter_object_header->size);
return thunk_end(thunk_dispatch(closure));
@@ -273,8 +419,6 @@ THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journa
*/
THUNK_DEFINE(journal_iter_next_object, iou_t *, iou, journal_t **, journal, Header *, header, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
{
- iou_op_t *op;
-
assert(iou);
assert(journal);
assert(iter_offset);
@@ -299,15 +443,8 @@ THUNK_DEFINE(journal_iter_next_object, iou_t *, iou, journal_t **, journal, Head
return thunk_dispatch(closure);
}
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, iter_object_header, sizeof(ObjectHeader), *iter_offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_iter_object_header(iou, op, *journal, iter_offset, iter_object_header, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *iter_offset, sizeof(ObjectHeader), iter_object_header, THUNK(
+ got_iter_object_header(iou, *journal, iter_offset, iter_object_header, closure)));
}
@@ -345,7 +482,7 @@ THUNK_DEFINE(journal_iter_objects, iou_t *, iou, journal_t **, journal, Header *
}
-THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, HashItem *, hash_table, uint64_t, nbuckets, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, journal_t *, journal, HashItem *, hash_table, uint64_t, nbuckets, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
{
assert(iou);
assert(journal);
@@ -355,12 +492,6 @@ THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *,
assert(iter_object_header);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result != iter_object_size)
- return -EINVAL;
-
iter_object_header->object.size = le64toh(iter_object_header->object.size);
iter_object_header->hash = le64toh(iter_object_header->hash);
iter_object_header->next_hash_offset = le64toh(iter_object_header->next_hash_offset);
@@ -424,7 +555,6 @@ THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *,
THUNK_DEFINE(journal_hash_table_iter_next_object, iou_t *, iou, journal_t **, journal, HashItem **, hash_table, uint64_t *, hash_table_size, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
{
size_t nbuckets;
- iou_op_t *op;
assert(iou);
assert(journal);
@@ -457,15 +587,8 @@ THUNK_DEFINE(journal_hash_table_iter_next_object, iou_t *, iou, journal_t **, jo
} while (!(*iter_offset));
}
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, iter_object_header, iter_object_size, *iter_offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_hash_table_iter_object_header(iou, op, *journal, *hash_table, *hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *iter_offset, iter_object_size, iter_object_header, THUNK(
+ got_hash_table_iter_object_header(iou, *journal, *hash_table, *hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure)));
}
@@ -508,20 +631,13 @@ THUNK_DEFINE(journal_hash_table_for_each, iou_t *, iou, journal_t **, journal, H
}
-THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, iou_op_t *, op, HashItem *, table, uint64_t, size, HashItem **, res_hash_table, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, HashItem *, table, uint64_t, size, HashItem **, res_hash_table, thunk_t *, closure)
{
assert(iou);
- assert(op);
assert(table);
assert(res_hash_table);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result != size)
- return -EINVAL;
-
for (uint64_t i = 0; i < size / sizeof(HashItem); i++) {
table[i].head_hash_offset = le64toh(table[i].head_hash_offset);
table[i].tail_hash_offset = le64toh(table[i].tail_hash_offset);
@@ -539,7 +655,6 @@ THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, iou_op_t *, op, HashItem *, tab
*/
THUNK_DEFINE(journal_get_hash_table, iou_t *, iou, journal_t **, journal, uint64_t *, hash_table_offset, uint64_t *, hash_table_size, HashItem **, res_hash_table, thunk_t *, closure)
{
- iou_op_t *op;
HashItem *table;
assert(iou);
@@ -549,37 +664,23 @@ THUNK_DEFINE(journal_get_hash_table, iou_t *, iou, journal_t **, journal, uint64
assert(res_hash_table);
assert(closure);
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
table = malloc(*hash_table_size);
if (!table)
return -ENOMEM;
- io_uring_prep_read(op->sqe, (*journal)->idx, table, *hash_table_size, *hash_table_offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_hashtable(iou, op, table, *hash_table_size, res_hash_table, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *hash_table_offset, *hash_table_size, table, THUNK(
+ got_hashtable(iou, table, *hash_table_size, res_hash_table, closure)));
}
/* Validate and prepare journal header loaded via journal_get_header @ header, dispatch closure. */
-THUNK_DEFINE_STATIC(got_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, Header *, header, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_header, iou_t *, iou, journal_t *, journal, Header *, header, thunk_t *, closure)
{
assert(iou);
- assert(op);
assert(journal);
assert(header);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result < sizeof(*header))
- return -EINVAL;
-
header->compatible_flags = le32toh(header->compatible_flags);
header->incompatible_flags = le32toh(header->incompatible_flags);
header->header_size = le64toh(header->header_size);
@@ -614,38 +715,22 @@ THUNK_DEFINE_STATIC(got_header, iou_t *, iou, iou_op_t *, op, journal_t *, journ
*/
THUNK_DEFINE(journal_get_header, iou_t *, iou, journal_t **, journal, Header *, header, thunk_t *, closure)
{
- iou_op_t *op;
-
assert(iou);
assert(journal);
assert(closure);
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, header, sizeof(*header), 0);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_header(iou, op, *journal, header, closure)));
-
- return 0;
+ return journal_read(iou, *journal, 0, sizeof(*header), header, THUNK(
+ got_header(iou, *journal, header, closure)));
}
/* Validate and prepare object header loaded via journal_get_object_header @ object_header, dispatch closure. */
-THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, iou_op_t *, op, ObjectHeader *, object_header, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, ObjectHeader *, object_header, thunk_t *, closure)
{
assert(iou);
- assert(op);
assert(object_header);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result < sizeof(*object_header))
- return -EINVAL;
-
object_header->size = le64toh(object_header->size);
/* TODO: validation/sanity checks? */
@@ -658,42 +743,26 @@ THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, iou_op_t *, op, ObjectHeade
*/
THUNK_DEFINE(journal_get_object_header, iou_t *, iou, journal_t **, journal, uint64_t *, offset, ObjectHeader *, object_header, thunk_t *, closure)
{
- iou_op_t *op;
-
assert(iou);
assert(journal);
assert(offset);
assert(object_header);
assert(closure);
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, object_header, sizeof(*object_header), *offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_object_header(iou, op, object_header, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *offset, sizeof(*object_header), object_header, THUNK(
+ got_object_header(iou, object_header, closure)));
}
#define OBJECT_N_ITEMS(_o) \
((_o.object.size - offsetof(typeof(_o), items)) / sizeof(*_o.items))
/* Validate and prepare object loaded via journal_get_object @ object, dispatch closure. */
-THUNK_DEFINE_STATIC(got_object, iou_t *, iou, iou_op_t *, op, uint64_t, size, Object *, object, thunk_t *, closure)
+THUNK_DEFINE_STATIC(got_object, iou_t *, iou, uint64_t, size, Object *, object, thunk_t *, closure)
{
assert(iou);
- assert(op);
assert(object);
assert(closure);
- if (op->result < 0)
- return op->result;
-
- if (op->result != size)
- return -EINVAL;
-
object->object.size = le64toh(object->object.size);
/* TODO: validation/sanity checks? */
@@ -771,8 +840,6 @@ THUNK_DEFINE_STATIC(got_object, iou_t *, iou, iou_op_t *, op, uint64_t, size, Ob
*/
THUNK_DEFINE(journal_get_object, iou_t *, iou, journal_t **, journal, uint64_t *, offset, uint64_t *, size, Object **, object, thunk_t *, closure)
{
- iou_op_t *op;
-
assert(iou);
assert(journal);
assert(offset);
@@ -780,15 +847,8 @@ THUNK_DEFINE(journal_get_object, iou_t *, iou, journal_t **, journal, uint64_t *
assert(object && *object);
assert(closure);
- op = iou_op_new(iou);
- if (!op)
- return -ENOMEM;
-
- io_uring_prep_read(op->sqe, (*journal)->idx, *object, *size, *offset);
- op->sqe->flags = IOSQE_FIXED_FILE;
- op_queue(iou, op, THUNK(got_object(iou, op, *size, *object, closure)));
-
- return 0;
+ return journal_read(iou, *journal, *offset, *size, *object, THUNK(
+ got_object(iou, *size, *object, closure)));
}
© All Rights Reserved