diff options
Diffstat (limited to 'src/journals.c')
-rw-r--r-- | src/journals.c | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/src/journals.c b/src/journals.c index 8211b6f..05c14f0 100644 --- a/src/journals.c +++ b/src/journals.c @@ -18,6 +18,7 @@ #include <dirent.h> #include <fcntl.h> #include <liburing.h> +#include <stddef.h> #include <stdio.h> #include <string.h> #include <sys/stat.h> @@ -689,6 +690,147 @@ THUNK_DEFINE(journal_get_object_header, iou_t *, iou, journal_t **, journal, uin return 0; } +#define OBJECT_N_ITEMS(_o) \ + ((_o.object.size - offsetof(typeof(_o), items)) / sizeof(*_o.items)) + +/* Validate and prepare object loaded via journal_get_object @ object, dispatch closure. */ +THUNK_DEFINE_STATIC(got_object, iou_t *, iou, iou_op_t *, op, uint64_t, size, Object *, object, thunk_t *, closure) +{ + assert(iou); + assert(op); + assert(object); + assert(closure); + + if (op->result < 0) + return op->result; + + if (op->result != size) + return -EINVAL; + + object->object.size = le64toh(object->object.size); + + /* TODO: validation/sanity checks? */ + switch (object->object.type) { + case OBJECT_DATA: + object->data.hashed.hash = le64toh(object->data.hashed.hash); + object->data.hashed.next_hash_offset = le64toh(object->data.hashed.next_hash_offset); + object->data.next_field_offset = le64toh(object->data.next_field_offset); + object->data.entry_offset = le64toh(object->data.entry_offset); + object->data.entry_array_offset = le64toh(object->data.entry_array_offset); + object->data.n_entries = le64toh(object->data.n_entries); + break; + + case OBJECT_FIELD: + object->field.hashed.hash = le64toh(object->field.hashed.hash); + object->field.hashed.next_hash_offset = le64toh(object->field.hashed.next_hash_offset); + object->field.head_data_offset = le64toh(object->field.head_data_offset); + break; + + case OBJECT_ENTRY: + object->entry.seqnum = le64toh(object->entry.seqnum); + object->entry.realtime = le64toh(object->entry.realtime); + object->entry.monotonic = le64toh(object->entry.monotonic); + //object->entry.boot_id + object->entry.xor_hash = le64toh(object->entry.xor_hash); + for (uint64_t i = 0, n_items = OBJECT_N_ITEMS(object->entry); i < n_items; i++) { + object->entry.items[i].object_offset = le64toh(object->entry.items[i].object_offset); + object->entry.items[i].hash; + } + break; + + case OBJECT_DATA_HASH_TABLE: + case OBJECT_FIELD_HASH_TABLE: + for (uint64_t i = 0, n_items = OBJECT_N_ITEMS(object->hash_table); i < n_items; i++) { + object->hash_table.items[i].head_hash_offset = le64toh(object->hash_table.items[i].head_hash_offset); + object->hash_table.items[i].tail_hash_offset = le64toh(object->hash_table.items[i].tail_hash_offset); + } + break; + + case OBJECT_ENTRY_ARRAY: + object->entry_array.next_entry_array_offset = le64toh(object->entry_array.next_entry_array_offset); + for (uint64_t i = 0, n_items = OBJECT_N_ITEMS(object->entry_array); i < n_items; i++) + object->entry_array.items[i] = le64toh(object->entry_array.items[i]); + break; + + case OBJECT_TAG: + object->tag.seqnum = le64toh(object->tag.seqnum); + object->tag.epoch = le64toh(object->tag.epoch); + break; + + default: + /* XXX: should probably just ignore unknown types instead, + * but the idea here is to let callers safely assume loaded objects + * have been fully validated and byteswapped as needed. + */ + assert(0); + } + + return thunk_dispatch(closure); +} + + +/* Queue IO on iou for loading an entire object of size *size from *journal @ offset *offset, into *object + * which must already be allocated. + * Registers closure for dispatch on the io when completed. + * + * Note this doesn't allocate space for the object and requires the size be already known, it is the bare + * minimum object loading into pre-allocated space when the size is known, which performs the necessary + * le64toh() swapping of object-specific members before calling the supplied closure. + * + * It's expected that the caller must already retrieve the object's header in a separate step before + * calling this to load the entirety of the object, since the header is needed first to know the size + * for allocating the full object and then loading its contents. Another heavier helper will be provided + * for doing both the header load followed by the entire object load in one convenient step. + */ +THUNK_DEFINE(journal_get_object, iou_t *, iou, journal_t **, journal, uint64_t *, offset, uint64_t *, size, Object **, object, thunk_t *, closure) +{ + iou_op_t *op; + + assert(iou); + assert(journal); + assert(offset); + assert(size); + assert(object && *object); + assert(closure); + + op = iou_op_new(iou); + if (!op) + return -ENOMEM; + + io_uring_prep_read(op->sqe, (*journal)->idx, *object, *size, *offset); + op->sqe->flags = IOSQE_FIXED_FILE; + op_queue(iou, op, THUNK(got_object(iou, op, *size, *object, closure))); + + return 0; +} + + +THUNK_DEFINE_STATIC(get_object_full_got_header, iou_t *, iou, journal_t **, journal, uint64_t *, offset, ObjectHeader *, object_header, Object **, object, thunk_t *, closure) +{ + Object *o; + + o = malloc(object_header->size); + if (!o) + return -ENOMEM; + + *object = o; + + return journal_get_object(iou, journal, offset, &object_header->size, object, closure); +} + + +/* Queue IO on iou for loading an object header into *object_header, which must already be allocated, + * registering a closure to then allocate space for the full object @ *object and queueing IO for loading + * the full object into that space, with closure registered for dispatch once the full object is loaded. + * + * This will leave a newly allocated and populated object @ *object, ready for use. + */ +THUNK_DEFINE(journal_get_object_full, iou_t *, iou, journal_t **, journal, uint64_t *, offset, ObjectHeader *, object_header, Object **, object, thunk_t *, closure) +{ + return journal_get_object_header(iou, journal, offset, object_header, THUNK( + get_object_full_got_header(iou, journal, offset, object_header, object, closure))); +} + /* for every open journal in *journals, store the journal in *journal_iter and dispatch closure */ /* closure must expect to be dispatched multiple times; once per journal, and will be freed once at end */ |