diff options
Diffstat (limited to 'src/journals.c')
-rw-r--r-- | src/journals.c | 280 |
1 files changed, 170 insertions, 110 deletions
diff --git a/src/journals.c b/src/journals.c index f2df53e..89cb130 100644 --- a/src/journals.c +++ b/src/journals.c @@ -44,6 +44,121 @@ typedef struct journals_t { } journals_t; +/* helper for bumping buf down to the bottom of journal->lru_{head,tail}, + * this should probably just pull in some linked list header like the kernel's + * list.h, open coded slop for now + */ +static void buf_used(journal_t *journal, journal_buf_t *buf) +{ + assert(journal); + assert(buf); + + if (journal->lru_tail == buf) + return; + + if (buf->lru_prev) + buf->lru_prev->lru_next = buf->lru_next; + else + journal->lru_head = buf->lru_next; + + buf->lru_next->lru_prev = buf->lru_prev; + buf->lru_next = NULL; + + journal->lru_tail->lru_next = buf; + buf->lru_prev = journal->lru_tail; + journal->lru_tail = buf; +} + + +THUNK_DEFINE_STATIC(buf_got_read, iou_t *, iou, iou_op_t *, op, void *, dest, uint64_t, offset, uint64_t, length, journal_buf_t *, buf, thunk_t *, closure) +{ + assert(iou); + assert(op); + assert(closure); + assert(dest || !buf); + + if (op->result < 0) + return op->result; + + if (op->result < length) + return -EINVAL; + + if (buf) { + memcpy(dest, buf->data, length); + buf->length = op->result; + buf->offset = offset; + buf->valid = 1; + } + + return thunk_end(thunk_dispatch(closure)); +} + + +/* read size bytes from offset offset in journal to dest, dispatch closure when done. + * for reads <= JOURNAL_BUF_SIZE in size, a per-journal cache of + * JOURNAL_BUF_CNT*JOURNAL_BUF_SIZE buffers is maintained and if the data is + * present it's simply copied from there before dispatching closure. + */ +int journal_read(iou_t *iou, journal_t *journal, uint64_t offset, uint64_t length, void *dest, thunk_t *closure) +{ + journal_buf_t *buf; + iou_op_t *op; + + assert(iou); + assert(journal); + assert(length); + assert(dest); + assert(closure); + + if (length <= JOURNAL_BUF_SIZE) { + /* small enough to fit, look in the buffers */ + + for (int i = 0; i < JOURNAL_BUF_CNT; i++) { + journal_buf_t *buf = &journal->bufs[i]; + + if (!buf->valid) + continue; + + if (offset >= buf->offset && offset + length <= buf->offset + buf->length) { + buf_used(journal, buf); + memcpy(dest, &buf->data[offset - buf->offset], length); + + thunk_dispatch(closure); + + return 0; + } + } + + /* buffer fits, but wasn't found, read it into the "fixed" lru buf, + * buf_got_read() will then copy out of the buf into dest when loaded. + */ + buf = journal->lru_head; + buf_used(journal, buf); + buf->valid = 0; + + op = iou_op_new(iou); + if (!op) + return -ENOMEM; + + io_uring_prep_read_fixed(op->sqe, journal->idx, buf->data, JOURNAL_BUF_SIZE, offset, buf->idx); + op->sqe->flags = IOSQE_FIXED_FILE; + op_queue(iou, op, THUNK(buf_got_read(iou, op, dest, offset, length, buf, closure))); + } else { + /* buffer doesn't fit, plain unbuffered read it into provided dest (assumed to be non-fixed) */ + + op = iou_op_new(iou); + if (!op) + return -ENOMEM; + + io_uring_prep_read(op->sqe, journal->idx, dest, length, offset); + op->sqe->flags = IOSQE_FIXED_FILE; + op_queue(iou, op, THUNK(buf_got_read(iou, op, NULL, offset, length, NULL, closure))); + } + + return 0; +} + + /* an open on journal->name was attempted, result in op->result. * bump *journals->n_opened, when it matches *journals->n_journals, dispatch closure */ @@ -67,6 +182,17 @@ THUNK_DEFINE_STATIC(opened_journal, iou_t *, iou, iou_op_t *, op, journals_t *, fprintf(stderr, "Permission denied opening \"%s\", ignoring\n", journal->name); journal->fd = -1; } else { + + /* setup journal->lru_{head,tail} bufs list */ + journal->lru_head = journal->lru_tail = &journal->bufs[0]; + for (int i = 1; i < JOURNAL_BUF_CNT; i++) { + journal_buf_t *buf = &journal->bufs[i]; + + journal->lru_tail->lru_next = buf; + buf->lru_prev = journal->lru_tail; + journal->lru_tail = buf; + } + journal->fd = op->result; } @@ -89,6 +215,32 @@ THUNK_DEFINE_STATIC(opened_journal, iou_t *, iou, iou_op_t *, op, journals_t *, if (r < 0) return r; + { + struct iovec *bufs; + int n_bufs = 0; + + bufs = malloc(sizeof(*bufs) * JOURNAL_BUF_CNT * journals->n_journals); + if (!bufs) + return -ENOMEM; + + for (int i = 0; i < journals->n_journals; i++) { + /* "register" buffers with io_uring for a perf boost */ + + + for (int j = 0; j < JOURNAL_BUF_CNT; j++) { + journal_buf_t *buf = &journals->journals[i].bufs[j]; + + bufs[n_bufs].iov_base = buf->data; + bufs[n_bufs].iov_len = JOURNAL_BUF_SIZE; + buf->idx = n_bufs++; + } + } + + r = io_uring_register_buffers(iou_ring(iou), bufs, n_bufs); + if (r < 0) + return r; + } + free(fds); return thunk_end(thunk_dispatch(closure)); @@ -232,7 +384,7 @@ THUNK_DEFINE(journals_open, iou_t *, iou, char **, machid, int, flags, journals_ -THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure) +THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, journal_t *, journal, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure) { assert(iou); assert(journal); @@ -240,12 +392,6 @@ THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journa assert(iter_object_header); assert(closure); - if (op->result < 0) - return op->result; - - if (op->result != sizeof(ObjectHeader)) - return -EINVAL; - iter_object_header->size = le64toh(iter_object_header->size); return thunk_end(thunk_dispatch(closure)); @@ -273,8 +419,6 @@ THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, iou_op_t *, op, journa */ THUNK_DEFINE(journal_iter_next_object, iou_t *, iou, journal_t **, journal, Header *, header, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure) { - iou_op_t *op; - assert(iou); assert(journal); assert(iter_offset); @@ -299,15 +443,8 @@ THUNK_DEFINE(journal_iter_next_object, iou_t *, iou, journal_t **, journal, Head return thunk_dispatch(closure); } - op = iou_op_new(iou); - if (!op) - return -ENOMEM; - - io_uring_prep_read(op->sqe, (*journal)->idx, iter_object_header, sizeof(ObjectHeader), *iter_offset); - op->sqe->flags = IOSQE_FIXED_FILE; - op_queue(iou, op, THUNK(got_iter_object_header(iou, op, *journal, iter_offset, iter_object_header, closure))); - - return 0; + return journal_read(iou, *journal, *iter_offset, sizeof(ObjectHeader), iter_object_header, THUNK( + got_iter_object_header(iou, *journal, iter_offset, iter_object_header, closure))); } @@ -345,7 +482,7 @@ THUNK_DEFINE(journal_iter_objects, iou_t *, iou, journal_t **, journal, Header * } -THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, HashItem *, hash_table, uint64_t, nbuckets, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure) +THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, journal_t *, journal, HashItem *, hash_table, uint64_t, nbuckets, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure) { assert(iou); assert(journal); @@ -355,12 +492,6 @@ THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *, assert(iter_object_header); assert(closure); - if (op->result < 0) - return op->result; - - if (op->result != iter_object_size) - return -EINVAL; - iter_object_header->object.size = le64toh(iter_object_header->object.size); iter_object_header->hash = le64toh(iter_object_header->hash); iter_object_header->next_hash_offset = le64toh(iter_object_header->next_hash_offset); @@ -424,7 +555,6 @@ THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, iou_op_t *, THUNK_DEFINE(journal_hash_table_iter_next_object, iou_t *, iou, journal_t **, journal, HashItem **, hash_table, uint64_t *, hash_table_size, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure) { size_t nbuckets; - iou_op_t *op; assert(iou); assert(journal); @@ -457,15 +587,8 @@ THUNK_DEFINE(journal_hash_table_iter_next_object, iou_t *, iou, journal_t **, jo } while (!(*iter_offset)); } - op = iou_op_new(iou); - if (!op) - return -ENOMEM; - - io_uring_prep_read(op->sqe, (*journal)->idx, iter_object_header, iter_object_size, *iter_offset); - op->sqe->flags = IOSQE_FIXED_FILE; - op_queue(iou, op, THUNK(got_hash_table_iter_object_header(iou, op, *journal, *hash_table, *hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure))); - - return 0; + return journal_read(iou, *journal, *iter_offset, iter_object_size, iter_object_header, THUNK( + got_hash_table_iter_object_header(iou, *journal, *hash_table, *hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure))); } @@ -508,20 +631,13 @@ THUNK_DEFINE(journal_hash_table_for_each, iou_t *, iou, journal_t **, journal, H } -THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, iou_op_t *, op, HashItem *, table, uint64_t, size, HashItem **, res_hash_table, thunk_t *, closure) +THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, HashItem *, table, uint64_t, size, HashItem **, res_hash_table, thunk_t *, closure) { assert(iou); - assert(op); assert(table); assert(res_hash_table); assert(closure); - if (op->result < 0) - return op->result; - - if (op->result != size) - return -EINVAL; - for (uint64_t i = 0; i < size / sizeof(HashItem); i++) { table[i].head_hash_offset = le64toh(table[i].head_hash_offset); table[i].tail_hash_offset = le64toh(table[i].tail_hash_offset); @@ -539,7 +655,6 @@ THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, iou_op_t *, op, HashItem *, tab */ THUNK_DEFINE(journal_get_hash_table, iou_t *, iou, journal_t **, journal, uint64_t *, hash_table_offset, uint64_t *, hash_table_size, HashItem **, res_hash_table, thunk_t *, closure) { - iou_op_t *op; HashItem *table; assert(iou); @@ -549,37 +664,23 @@ THUNK_DEFINE(journal_get_hash_table, iou_t *, iou, journal_t **, journal, uint64 assert(res_hash_table); assert(closure); - op = iou_op_new(iou); - if (!op) - return -ENOMEM; - table = malloc(*hash_table_size); if (!table) return -ENOMEM; - io_uring_prep_read(op->sqe, (*journal)->idx, table, *hash_table_size, *hash_table_offset); - op->sqe->flags = IOSQE_FIXED_FILE; - op_queue(iou, op, THUNK(got_hashtable(iou, op, table, *hash_table_size, res_hash_table, closure))); - - return 0; + return journal_read(iou, *journal, *hash_table_offset, *hash_table_size, table, THUNK( + got_hashtable(iou, table, *hash_table_size, res_hash_table, closure))); } /* Validate and prepare journal header loaded via journal_get_header @ header, dispatch closure. */ -THUNK_DEFINE_STATIC(got_header, iou_t *, iou, iou_op_t *, op, journal_t *, journal, Header *, header, thunk_t *, closure) +THUNK_DEFINE_STATIC(got_header, iou_t *, iou, journal_t *, journal, Header *, header, thunk_t *, closure) { assert(iou); - assert(op); assert(journal); assert(header); assert(closure); - if (op->result < 0) - return op->result; - - if (op->result < sizeof(*header)) - return -EINVAL; - header->compatible_flags = le32toh(header->compatible_flags); header->incompatible_flags = le32toh(header->incompatible_flags); header->header_size = le64toh(header->header_size); @@ -614,38 +715,22 @@ THUNK_DEFINE_STATIC(got_header, iou_t *, iou, iou_op_t *, op, journal_t *, journ */ THUNK_DEFINE(journal_get_header, iou_t *, iou, journal_t **, journal, Header *, header, thunk_t *, closure) { - iou_op_t *op; - assert(iou); assert(journal); assert(closure); - op = iou_op_new(iou); - if (!op) - return -ENOMEM; - - io_uring_prep_read(op->sqe, (*journal)->idx, header, sizeof(*header), 0); - op->sqe->flags = IOSQE_FIXED_FILE; - op_queue(iou, op, THUNK(got_header(iou, op, *journal, header, closure))); - - return 0; + return journal_read(iou, *journal, 0, sizeof(*header), header, THUNK( + got_header(iou, *journal, header, closure))); } /* Validate and prepare object header loaded via journal_get_object_header @ object_header, dispatch closure. */ -THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, iou_op_t *, op, ObjectHeader *, object_header, thunk_t *, closure) +THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, ObjectHeader *, object_header, thunk_t *, closure) { assert(iou); - assert(op); assert(object_header); assert(closure); - if (op->result < 0) - return op->result; - - if (op->result < sizeof(*object_header)) - return -EINVAL; - object_header->size = le64toh(object_header->size); /* TODO: validation/sanity checks? */ @@ -658,42 +743,26 @@ THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, iou_op_t *, op, ObjectHeade */ THUNK_DEFINE(journal_get_object_header, iou_t *, iou, journal_t **, journal, uint64_t *, offset, ObjectHeader *, object_header, thunk_t *, closure) { - iou_op_t *op; - assert(iou); assert(journal); assert(offset); assert(object_header); assert(closure); - op = iou_op_new(iou); - if (!op) - return -ENOMEM; - - io_uring_prep_read(op->sqe, (*journal)->idx, object_header, sizeof(*object_header), *offset); - op->sqe->flags = IOSQE_FIXED_FILE; - op_queue(iou, op, THUNK(got_object_header(iou, op, object_header, closure))); - - return 0; + return journal_read(iou, *journal, *offset, sizeof(*object_header), object_header, THUNK( + got_object_header(iou, object_header, closure))); } #define OBJECT_N_ITEMS(_o) \ ((_o.object.size - offsetof(typeof(_o), items)) / sizeof(*_o.items)) /* Validate and prepare object loaded via journal_get_object @ object, dispatch closure. */ -THUNK_DEFINE_STATIC(got_object, iou_t *, iou, iou_op_t *, op, uint64_t, size, Object *, object, thunk_t *, closure) +THUNK_DEFINE_STATIC(got_object, iou_t *, iou, uint64_t, size, Object *, object, thunk_t *, closure) { assert(iou); - assert(op); assert(object); assert(closure); - if (op->result < 0) - return op->result; - - if (op->result != size) - return -EINVAL; - object->object.size = le64toh(object->object.size); /* TODO: validation/sanity checks? */ @@ -771,8 +840,6 @@ THUNK_DEFINE_STATIC(got_object, iou_t *, iou, iou_op_t *, op, uint64_t, size, Ob */ THUNK_DEFINE(journal_get_object, iou_t *, iou, journal_t **, journal, uint64_t *, offset, uint64_t *, size, Object **, object, thunk_t *, closure) { - iou_op_t *op; - assert(iou); assert(journal); assert(offset); @@ -780,15 +847,8 @@ THUNK_DEFINE(journal_get_object, iou_t *, iou, journal_t **, journal, uint64_t * assert(object && *object); assert(closure); - op = iou_op_new(iou); - if (!op) - return -ENOMEM; - - io_uring_prep_read(op->sqe, (*journal)->idx, *object, *size, *offset); - op->sqe->flags = IOSQE_FIXED_FILE; - op_queue(iou, op, THUNK(got_object(iou, op, *size, *object, closure))); - - return 0; + return journal_read(iou, *journal, *offset, *size, *object, THUNK( + got_object(iou, *size, *object, closure))); } |