summaryrefslogtreecommitdiff
path: root/src/journals.c
blob: c0d2b559c6576ff1a5435adcff119630cada0c93 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
/*
 *  Copyright (C) 2020 - Vito Caputo - <vcaputo@pengaru.com>
 *
 *  This program is free software: you can redistribute it and/or modify it
 *  under the terms of the GNU General Public License version 3 as published
 *  by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <dirent.h>
#include <fcntl.h>
#include <liburing.h>
#include <stddef.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <iou.h>
#include <thunk.h>

#include "journals.h"
#include "op.h"

#include "upstream/journal-def.h"


#define PERSISTENT_PATH	"/var/log/journal"


typedef struct journals_t {
	int		dirfd;
	size_t		n_journals, n_allocated, n_opened;
	journal_t	journals[];
} journals_t;


/* helper for bumping buf down to the bottom of journal->lru_{head,tail},
 * this should probably just pull in some linked list header like the kernel's
 * list.h, open coded slop for now
 */
static void buf_used(journal_t *journal, journal_buf_t *buf)
{
	assert(journal);
	assert(buf);

	if (journal->lru_tail == buf)
		return;

	if (buf->lru_prev)
		buf->lru_prev->lru_next = buf->lru_next;
	else
		journal->lru_head = buf->lru_next;

	buf->lru_next->lru_prev = buf->lru_prev;
	buf->lru_next = NULL;

	journal->lru_tail->lru_next = buf;
	buf->lru_prev = journal->lru_tail;
	journal->lru_tail = buf;
}


THUNK_DEFINE_STATIC(buf_got_read, iou_t *, iou, iou_op_t *, op, void *, dest, uint64_t, offset, uint64_t, length, journal_buf_t *, buf, thunk_t *, closure)
{
	assert(iou);
	assert(op);
	assert(closure);
	assert(dest || !buf);

	if (op->result < 0)
		return op->result;

	if (op->result < length)
		return -EINVAL;

	if (buf) {
		memcpy(dest, buf->data, length);
		buf->length = op->result;
		buf->offset = offset;
		buf->valid = 1;
	}

	return thunk_end(thunk_dispatch(closure));
}


/* read size bytes from offset offset in journal to dest, dispatch closure when done.
 * for reads <= JOURNAL_BUF_SIZE in size, a per-journal cache of
 * JOURNAL_BUF_CNT*JOURNAL_BUF_SIZE buffers is maintained and if the data is
 * present it's simply copied from there before dispatching closure.
 */
int journal_read(iou_t *iou, journal_t *journal, uint64_t offset, uint64_t length, void *dest, thunk_t *closure)
{
	journal_buf_t	*buf;
	iou_op_t	*op;

	assert(iou);
	assert(journal);
	assert(length);
	assert(dest);
	assert(closure);

	if (length <= JOURNAL_BUF_SIZE) {
		/* small enough to fit, look in the buffers */

		for (int i = 0; i < JOURNAL_BUF_CNT; i++) {
			journal_buf_t	*buf = &journal->bufs[i];

			if (!buf->valid)
				continue;

			if (offset >= buf->offset && offset + length <= buf->offset + buf->length) {
				buf_used(journal, buf);
				memcpy(dest, &buf->data[offset - buf->offset], length);

				return thunk_end(thunk_dispatch(closure));
			}
		}

		/* buffer fits, but wasn't found, read it into the "fixed" lru buf,
		 * buf_got_read() will then copy out of the buf into dest when loaded.
		 */
		buf = journal->lru_head;
		buf_used(journal, buf);
		buf->valid = 0;

		op = iou_op_new(iou);
		if (!op)
			return -ENOMEM;

		io_uring_prep_read_fixed(op->sqe, journal->idx, buf->data, JOURNAL_BUF_SIZE, offset, buf->idx);
		op->sqe->flags = IOSQE_FIXED_FILE;
		op_queue(iou, op, THUNK(buf_got_read(iou, op, dest, offset, length, buf, closure)));
	} else {
		/* buffer doesn't fit, plain unbuffered read it into provided dest (assumed to be non-fixed)  */

		op = iou_op_new(iou);
		if (!op)
			return -ENOMEM;

		io_uring_prep_read(op->sqe, journal->idx, dest, length, offset);
		op->sqe->flags = IOSQE_FIXED_FILE;
		op_queue(iou, op, THUNK(buf_got_read(iou, op, NULL, offset, length, NULL, closure)));
	}

	return 0;
}


/* an open on journal->name was attempted, result in op->result.
 * bump *journals->n_opened, when it matches *journals->n_journals, dispatch closure
 */
THUNK_DEFINE_STATIC(opened_journal, iou_t *, iou, iou_op_t *, op, journals_t *, journals, journal_t *, journal, thunk_t *, closure)
{
	assert(iou);
	assert(op);
	assert(journals);
	assert(journal);
	assert(closure);

	/* note n_opened is a count of open calls, not successes, the closure only gets called
	 * when all the opens have been performed hence the need to count them.
	 */
	journals->n_opened++;

	if (op->result < 0 ) {
		if (op->result != -EPERM && op->result != -EACCES)
			return op->result;

		fprintf(stderr, "Permission denied opening \"%s\", ignoring\n", journal->name);
		journal->fd = -1;
	} else {

		/* setup journal->lru_{head,tail} bufs list */
		journal->lru_head = journal->lru_tail = &journal->bufs[0];
		for (int i = 1; i < JOURNAL_BUF_CNT; i++) {
			journal_buf_t	*buf = &journal->bufs[i];

			journal->lru_tail->lru_next = buf;
			buf->lru_prev = journal->lru_tail;
			journal->lru_tail = buf;
		}

		journal->fd = op->result;
	}

	if (journals->n_opened == journals->n_journals) {
		int	*fds, r;

		/* "register" all the opened journals, storing their respective
		 * index in journal->idx for convenience.
		 */
		fds = malloc(sizeof(*fds) * journals->n_journals);
		if (!fds)
			return -ENOMEM;

		for (int i = 0; i < journals->n_journals; i++) {
			fds[i] = journals->journals[i].fd;
			journals->journals[i].idx = i;
		}

		r = io_uring_register_files(iou_ring(iou), fds, journals->n_journals);
		if (r < 0)
			return r;

		{
			struct iovec	*bufs;
			int		n_bufs = 0;

			bufs = malloc(sizeof(*bufs) * JOURNAL_BUF_CNT * journals->n_journals);
			if (!bufs)
				return -ENOMEM;

			for (int i = 0; i < journals->n_journals; i++) {
				/* "register" buffers with io_uring for a perf boost */


				for (int j = 0; j < JOURNAL_BUF_CNT; j++) {
					journal_buf_t	*buf = &journals->journals[i].bufs[j];

					bufs[n_bufs].iov_base = buf->data;
					bufs[n_bufs].iov_len = JOURNAL_BUF_SIZE;
					buf->idx = n_bufs++;
				}
			}

			r = io_uring_register_buffers(iou_ring(iou), bufs, n_bufs);
			if (r < 0) {
				free(bufs);
				return r;
			}

			free(bufs);
		}

		free(fds);

		return thunk_end(thunk_dispatch(closure));
	}

	return 0;
}


THUNK_DEFINE_STATIC(get_journals, iou_t *, iou, iou_op_t *, op, journals_t **, journals, int, flags, thunk_t *, closure)
{
	struct dirent	*dent;
	DIR		*jdir;
	size_t		n = 0;
	journals_t	*j;
	int		r;

	assert(iou);
	assert(op);
	assert(journals);
	assert(closure);

	if (op->result < 0)
		return op->result;

	/* I don't see any readdir/getdents ops for io_uring, so just do the opendir/readdir
	 * synchronously here before queueing the opening of all those paths.
	 */
	jdir = fdopendir(op->result);
	if (jdir == NULL) {
		int	r = errno;

		close(op->result); /* only on successful fdopendir is the fd owned and closed by jdir */

		return r;
	}

	while ((dent = readdir(jdir))) {
		if (dent->d_name[0] == '.')	/* just skip dot files and "." ".." */
			continue;

		n++;
	}
	rewinddir(jdir);

	if (!n)	/* no journals! */
		return 0;

	j = calloc(1, sizeof(journals_t) + sizeof(j->journals[0]) * n);
	if (!j) {
		closedir(jdir);
		return -ENOMEM;
	}

	j->n_allocated = n;

	while ((dent = readdir(jdir))) {
		if (dent->d_name[0] == '.')	/* just skip dot files and "." ".." */
			continue;

		j->journals[j->n_journals++].name = strdup(dent->d_name);

		assert(j->n_journals <= j->n_allocated);// FIXME this defends against the race, but the race
							// is a normal thing and should be handled gracefully,
							// or just stop doing this two pass dance.
	}

	/* duplicate the dirfd for openat use later on, since closedir() will close op->result */
	j->dirfd = dup(op->result);
	closedir(jdir);

	/* to keep things relatively simple, let's ensure there's enough queue space for at least one
	 * operation per journal.
	 */
	r = iou_resize(iou, j->n_journals);
	if (r < 0)
		return r;	/* TODO: cleanup j */

	/* we have a list of journal names, now queue requests to open them all */
	for (size_t i = 0; i < j->n_journals; i++) {
		iou_op_t	*oop;

		oop = iou_op_new(iou);
		if (!oop)
			return -ENOMEM;	// FIXME: cleanup

		io_uring_prep_openat(oop->sqe, j->dirfd, j->journals[i].name, flags, 0);
		op_queue(iou, oop, THUNK(opened_journal(iou, oop, j, &j->journals[i], closure)));
	}

	iou_flush(iou);

	/* stow the journals where they can be found, but note they aren't opened yet. */
	*journals = j;

	return 0;
}


THUNK_DEFINE_STATIC(var_opened, iou_t *, iou, iou_op_t *, op, char **, machid, journals_t **, journals, int, flags, thunk_t *, closure)
{
	iou_op_t	*mop;

	if (op->result < 0)
		return op->result;

	/* PERSISTENT_PATH is opened, req open ./$machid */
	mop = iou_op_new(iou);
	if (!mop)
		return -ENOMEM;

	/* req iou to open dir, passing the req to read and open its contents */
	io_uring_prep_openat(mop->sqe, op->result, *machid, O_DIRECTORY|O_RDONLY, 0);
	op_queue(iou, mop, THUNK(get_journals(iou, mop, journals, flags, closure)));

	return 0;
}


/* requesting opening the journals via iou, allocating the resulting journals_t @ *journals */
/* returns < 0 on error, 0 on successful queueing of operation */
THUNK_DEFINE(journals_open, iou_t *, iou, char **, machid, int, flags, journals_t **, journals, thunk_t *, closure)
{
	iou_op_t	*op;

	assert(iou);
	assert(machid);
	assert(journals);
	assert(closure);

	op = iou_op_new(iou);
	if (!op)
		return -ENOMEM;

	/* req iou to open PERSISTENT_PATH, passing on to var_opened */
	io_uring_prep_openat(op->sqe, 0, PERSISTENT_PATH, O_DIRECTORY|O_RDONLY, 0);
	op_queue(iou, op, THUNK(var_opened(iou, op, machid, journals, flags, closure)));

	return 0;
}



THUNK_DEFINE_STATIC(got_iter_object_header, iou_t *, iou, journal_t *, journal, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
{
	assert(iou);
	assert(journal);
	assert(iter_offset);
	assert(iter_object_header);
	assert(closure);

	iter_object_header->size = le64toh(iter_object_header->size);

	return thunk_end(thunk_dispatch(closure));
}


/* Queues IO on iou for getting the next object from (*journal)->fd
 * addressed by state in {iter_offset,iter_object_header} storing it @
 * *iter_object_header which must already have space allocated to accomodate
 * sizeof(ObjectHeader), registering closure for dispatch when completed.  This
 * only performs a single step, advancing the offset in *iter_offset by
 * *iter_object_header->size, ready for continuation in a subsequent call.
 *
 * When (*iter_offset == 0) it's considered the initial iteration, as this is
 * an invalid object offset (0=the journal header), the first object of the
 * journal will be loaded into *iter_object_header without advancing anything -
 * ignoring the initial contents of *iter_object_header.
 *
 * When closure gets dispatched with (*iter_offset == 0), there's no more
 * objects in journal, and *iter_object_header will have been unmodified.
 * Hence closure should always check if (*iter_offset == 0) before accessing
 * *iter_object_header, and do whatever is appropriate upon reaching the end of
 * the journal.  If journal_iter_next_object() recurs after reaching this
 * point, it will restart iterating from the first object of the journal.
 */
THUNK_DEFINE(journal_iter_next_object, iou_t *, iou, journal_t **, journal, Header *, header, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
{
	assert(iou);
	assert(journal);
	assert(iter_offset);
	assert(iter_object_header);
	assert(closure);

	/* restart iterating when entered with (*iter_offset == 0) */
	if (!(*iter_offset))
		*iter_offset = header->header_size;
	else {
		if (iter_object_header->size)
			*iter_offset += ALIGN64(iter_object_header->size);
		else {
			fprintf(stderr, "Encountered zero-sized object, journal \"%s\" appears corrupt, ignoring remainder\n", (*journal)->name);
			*iter_offset = header->tail_object_offset + 1;
		}
	}

	/* final dispatch if past tail object */
	if (*iter_offset > header->tail_object_offset) {
		*iter_offset = 0;
		return thunk_dispatch(closure);
	}

	return	journal_read(iou, *journal, *iter_offset, sizeof(ObjectHeader), iter_object_header, THUNK(
			got_iter_object_header(iou, *journal, iter_offset, iter_object_header, closure)));
}


/* Helper for the journal_iter_objects() simple objects iteration. */
THUNK_DEFINE_STATIC(journal_iter_objects_dispatch, iou_t *, iou, journal_t **, journal, Header *, header, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
{
	int	r;

	r = thunk_dispatch(closure);
	if (r < 0)
		return r;

	if (!(*iter_offset))
		return 0;

	return	thunk_end(journal_iter_next_object(iou, journal, header, iter_offset, iter_object_header, THUNK(
			journal_iter_objects_dispatch(iou, journal, header, iter_offset, iter_object_header, closure))));
}


/* Convenience journal object iterator, dispatches thunk for every object in the journal.
 *
 * Note in this wrapper there's no way to control/defer the iterator, your
 * closure is simply dispatched in a loop, no continuation is passed to it for
 * resuming iteration, which tends to limit its use to simple situations.
 */
THUNK_DEFINE(journal_iter_objects, iou_t *, iou, journal_t **, journal, Header *, header, uint64_t *, iter_offset, ObjectHeader *, iter_object_header, thunk_t *, closure)
{
	assert(iter_offset);

	*iter_offset = 0;

	return	journal_iter_next_object(iou, journal, header, iter_offset, iter_object_header, THUNK(
			journal_iter_objects_dispatch(iou, journal, header, iter_offset, iter_object_header, closure)));
}


THUNK_DEFINE_STATIC(got_hash_table_iter_object_header, iou_t *, iou, journal_t *, journal, HashItem *, hash_table, uint64_t, nbuckets, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
{
	assert(iou);
	assert(journal);
	assert(hash_table);
	assert(iter_bucket && *iter_bucket < nbuckets);
	assert(iter_offset);
	assert(iter_object_header);
	assert(closure);

	iter_object_header->object.size = le64toh(iter_object_header->object.size);
	iter_object_header->hash = le64toh(iter_object_header->hash);
	iter_object_header->next_hash_offset = le64toh(iter_object_header->next_hash_offset);

	if (iter_object_size > sizeof(HashedObjectHeader)) {
		/* The caller can iterate either the field or data hash tables,
		 * so just introspect and handle those two... using a size >
		 * than the minimum HashedObjectHeader as a heuristic.
		 * Scenarios that only need what's available in HashedObjectHeader
		 * can bypass all this and only read in the HashedObjectHeader
		 * while iterating.
		 */
		switch (iter_object_header->object.type) {
		case OBJECT_DATA: {
			DataObject	*data_object = (DataObject *)iter_object_header;

			assert(iter_object_size >= sizeof(DataObject));

			data_object->next_field_offset = le64toh(data_object->next_field_offset);
			data_object->entry_offset = le64toh(data_object->entry_offset);
			data_object->entry_array_offset = le64toh(data_object->entry_array_offset);
			data_object->n_entries = le64toh(data_object->n_entries);
			break;
		}
		case OBJECT_FIELD: {
			FieldObject	*field_object = (FieldObject *)iter_object_header;

			assert(iter_object_size >= sizeof(FieldObject));

			field_object->head_data_offset = le64toh(field_object->head_data_offset);
			break;
		}
		default:
			assert(0);
		}
	}

	return thunk_end(thunk_dispatch(closure));
}


/* Queues IO on iou for getting the next object from hash_table{_size}
 * positioned by state {iter_bucket,iter_offset,iter_object_header} storing it
 * @ *iter_object_header which must already have space allocated to accomodate
 * iter_object_size, registering closure for dispatch when completed.  This
 * only performs a single step, advancing the state in
 * {iter_bucket,iter_offset} appropriate for continuing via another call.
 *
 * When (*iter_offset == 0) it's considered the initial iteration, as this is
 * an invalid object offset (0=the journal header), and the first object of the
 * table will be loaded into *iter_object_header without advancing anything.
 *
 * When closure gets dispatched with (*iter_offset == 0), there's no more
 * objects in hash_table, and *iter_object_header will have been unmodified.
 * Hence closure should always check if (*iter_offset == 0) before using
 * *iter_object_header, and do whatever is appropriate upon reaching the end of
 * the hash table.  If journal_hash_table_iter_next_object() recurs after
 * reaching this point, it will restart iterating from the first object of the
 * table.
 */
THUNK_DEFINE(journal_hash_table_iter_next_object, iou_t *, iou, journal_t **, journal, HashItem **, hash_table, uint64_t *, hash_table_size, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
{
	size_t		nbuckets;

	assert(iou);
	assert(journal);
	assert(hash_table);
	assert(hash_table_size && *hash_table_size >= sizeof(HashItem));
	assert(iter_bucket);
	assert(iter_offset);
	assert(iter_object_header);
	assert(iter_object_size >= sizeof(HashedObjectHeader));
	assert(closure);

	nbuckets = *hash_table_size / sizeof(HashItem);

	assert(*iter_bucket < nbuckets);

	/* restart iterating when entered with (*iter_offset == 0) */
	if (!*iter_offset)
		*iter_bucket = 0;

	if (*iter_offset && *iter_offset != (*hash_table)[*iter_bucket].tail_hash_offset) {
		*iter_offset = iter_object_header->next_hash_offset;
	} else {
		do {
			(*iter_bucket)++;
			if (*iter_bucket >= nbuckets) {
				*iter_offset = 0;
				return thunk_end(thunk_dispatch(closure));
			}
			(*iter_offset) = (*hash_table)[*iter_bucket].head_hash_offset;
		} while (!(*iter_offset));
	}

	return	journal_read(iou, *journal, *iter_offset, iter_object_size, iter_object_header, THUNK(
			got_hash_table_iter_object_header(iou, *journal, *hash_table, *hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure)));
}


/* Helper for the journal_hash_table_for_each() simple hash table iteration. */
THUNK_DEFINE_STATIC(journal_hash_table_for_each_dispatch, iou_t *, iou, journal_t **, journal, HashItem **, hash_table, uint64_t *, hash_table_size, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
{
	int	r;

	r = thunk_dispatch(closure);
	if (r < 0)
		return r;

	if (!*iter_offset)
		return 0;

	return	thunk_end(journal_hash_table_iter_next_object(iou, journal, hash_table, hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, THUNK(
			journal_hash_table_for_each_dispatch(iou, journal, hash_table, hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure))));
}


/* Convenience hash table iterator, dispatches thunk for every object in the hash table.
 * iter_object_size would typically be sizeof(DataObject) or sizeof(FieldObject), as
 * these are the two hash table types currently handled.  The size is supplied rather
 * than a type, because one may iterate by only loading the HashedObjectHeader portion.
 * The size is introspected to determine if more than just the HashedObjectHeader is being
 * loaded, and its type asserted to fit in the supplied space.
 *
 * Note in this wrapper there's no way to control/defer the iterator, your
 * closure is simply dispatched in a loop, no continuation is passed to it for
 * resuming iteration, which tends to limit its use to simple situations.
 */
THUNK_DEFINE(journal_hash_table_for_each, iou_t *, iou, journal_t **, journal, HashItem **, hash_table, uint64_t *, hash_table_size, uint64_t *, iter_bucket, uint64_t *, iter_offset, HashedObjectHeader *, iter_object_header, size_t, iter_object_size, thunk_t *, closure)
{
	assert(iter_offset);

	*iter_offset = 0;

	return	journal_hash_table_iter_next_object(iou, journal, hash_table, hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, THUNK(
			journal_hash_table_for_each_dispatch(iou, journal, hash_table, hash_table_size, iter_bucket, iter_offset, iter_object_header, iter_object_size, closure)));
}


THUNK_DEFINE_STATIC(got_hashtable, iou_t *, iou, HashItem *, table, uint64_t, size, HashItem **, res_hash_table, thunk_t *, closure)
{
	assert(iou);
	assert(table);
	assert(res_hash_table);
	assert(closure);

	for (uint64_t i = 0; i < size / sizeof(HashItem); i++) {
		table[i].head_hash_offset = le64toh(table[i].head_hash_offset);
		table[i].tail_hash_offset = le64toh(table[i].tail_hash_offset);
	}
	/* TODO: validation/sanity checks? */

	*res_hash_table = table;

	return thunk_end(thunk_dispatch(closure));
}


/* Queue IO on iou for loading a journal's hash table from *journal into memory allocated and stored @ *res_hash_table.
 * Registers closure for dispatch when completed.
 */
THUNK_DEFINE(journal_get_hash_table, iou_t *, iou, journal_t **, journal, uint64_t *, hash_table_offset, uint64_t *, hash_table_size, HashItem **, res_hash_table, thunk_t *, closure)
{
	HashItem	*table;

	assert(iou);
	assert(journal);
	assert(hash_table_offset);
	assert(hash_table_size);
	assert(res_hash_table);
	assert(closure);

	table = malloc(*hash_table_size);
	if (!table)
		return -ENOMEM;

	return	journal_read(iou, *journal, *hash_table_offset, *hash_table_size, table, THUNK(
			got_hashtable(iou, table, *hash_table_size, res_hash_table, closure)));
}


/* Validate and prepare journal header loaded via journal_get_header @ header, dispatch closure. */
THUNK_DEFINE_STATIC(got_header, iou_t *, iou, journal_t *, journal, Header *, header, thunk_t *, closure)
{
	assert(iou);
	assert(journal);
	assert(header);
	assert(closure);

	header->compatible_flags = le32toh(header->compatible_flags);
	header->incompatible_flags = le32toh(header->incompatible_flags);
	header->header_size = le64toh(header->header_size);
	header->arena_size = le64toh(header->arena_size);
	header->data_hash_table_offset = le64toh(header->data_hash_table_offset);
	header->data_hash_table_size = le64toh(header->data_hash_table_size);
	header->field_hash_table_offset = le64toh(header->field_hash_table_offset);
	header->field_hash_table_size = le64toh(header->field_hash_table_size);
	header->tail_object_offset = le64toh(header->tail_object_offset);
	header->n_objects = le64toh(header->n_objects);
	header->n_entries = le64toh(header->n_entries);
	header->tail_entry_seqnum = le64toh(header->tail_entry_seqnum);
	header->head_entry_seqnum = le64toh(header->head_entry_seqnum);
	header->entry_array_offset = le64toh(header->entry_array_offset);
	header->head_entry_realtime = le64toh(header->head_entry_realtime);
	header->tail_entry_realtime = le64toh(header->tail_entry_realtime);
	header->tail_entry_monotonic = le64toh(header->tail_entry_monotonic);
	header->n_data = le64toh(header->n_data);
	header->n_fields = le64toh(header->n_fields);
	header->n_tags = le64toh(header->n_tags);
	header->n_entry_arrays = le64toh(header->n_entry_arrays);
	header->data_hash_chain_depth = le64toh(header->data_hash_chain_depth);
	header->field_hash_chain_depth = le64toh(header->field_hash_chain_depth);
	/* TODO: validation/sanity checks? */

	return thunk_end(thunk_dispatch(closure));
}


/* Queue IO on iou for loading a journal's header from *journal into *header.
 * Registers closure for dispatch when completed.
 */
THUNK_DEFINE(journal_get_header, iou_t *, iou, journal_t **, journal, Header *, header, thunk_t *, closure)
{
	assert(iou);
	assert(journal);
	assert(closure);

	return	journal_read(iou, *journal, 0, sizeof(*header), header, THUNK(
			got_header(iou, *journal, header, closure)));
}


/* Validate and prepare object header loaded via journal_get_object_header @ object_header, dispatch closure. */
THUNK_DEFINE_STATIC(got_object_header, iou_t *, iou, ObjectHeader *, object_header, thunk_t *, closure)
{
	assert(iou);
	assert(object_header);
	assert(closure);

	object_header->size = le64toh(object_header->size);
	/* TODO: validation/sanity checks? */

	return thunk_end(thunk_dispatch(closure));
}


/* Queue IO on iou for loading an object header from *journal @ offset *offset, into *object_header.
 * Registers closure for dispatch on the io when completed.
 */
THUNK_DEFINE(journal_get_object_header, iou_t *, iou, journal_t **, journal, uint64_t *, offset, ObjectHeader *, object_header, thunk_t *, closure)
{
	assert(iou);
	assert(journal);
	assert(offset);
	assert(object_header);
	assert(closure);

	return	journal_read(iou, *journal, *offset, sizeof(*object_header), object_header, THUNK(
			got_object_header(iou, object_header, closure)));
}

#define OBJECT_N_ITEMS(_o)	\
	((_o.object.size - offsetof(typeof(_o), items)) / sizeof(*_o.items))

/* Validate and prepare object loaded via journal_get_object @ object, dispatch closure. */
THUNK_DEFINE_STATIC(got_object, iou_t *, iou, uint64_t, size, Object *, object, thunk_t *, closure)
{
	assert(iou);
	assert(object);
	assert(closure);

	object->object.size = le64toh(object->object.size);

	/* TODO: validation/sanity checks? */
	switch (object->object.type) {
	case OBJECT_DATA:
		object->data.hashed.hash = le64toh(object->data.hashed.hash);
		object->data.hashed.next_hash_offset = le64toh(object->data.hashed.next_hash_offset);
		object->data.next_field_offset = le64toh(object->data.next_field_offset);
		object->data.entry_offset = le64toh(object->data.entry_offset);
		object->data.entry_array_offset = le64toh(object->data.entry_array_offset);
		object->data.n_entries = le64toh(object->data.n_entries);
		break;

	case OBJECT_FIELD:
		object->field.hashed.hash = le64toh(object->field.hashed.hash);
		object->field.hashed.next_hash_offset = le64toh(object->field.hashed.next_hash_offset);
		object->field.head_data_offset = le64toh(object->field.head_data_offset);
		break;

	case OBJECT_ENTRY:
		object->entry.seqnum = le64toh(object->entry.seqnum);
		object->entry.realtime = le64toh(object->entry.realtime);
		object->entry.monotonic = le64toh(object->entry.monotonic);
		//object->entry.boot_id
		object->entry.xor_hash = le64toh(object->entry.xor_hash);
		for (uint64_t i = 0, n_items = OBJECT_N_ITEMS(object->entry); i < n_items; i++) {
			object->entry.items[i].object_offset = le64toh(object->entry.items[i].object_offset);
			object->entry.items[i].hash = le64toh(object->entry.items[i].hash);
		}
		break;

	case OBJECT_DATA_HASH_TABLE:
	case OBJECT_FIELD_HASH_TABLE:
		for (uint64_t i = 0, n_items = OBJECT_N_ITEMS(object->hash_table); i < n_items; i++) {
			object->hash_table.items[i].head_hash_offset = le64toh(object->hash_table.items[i].head_hash_offset);
			object->hash_table.items[i].tail_hash_offset = le64toh(object->hash_table.items[i].tail_hash_offset);
		}
		break;

	case OBJECT_ENTRY_ARRAY:
		object->entry_array.next_entry_array_offset = le64toh(object->entry_array.next_entry_array_offset);
		for (uint64_t i = 0, n_items = OBJECT_N_ITEMS(object->entry_array); i < n_items; i++)
			object->entry_array.items[i] = le64toh(object->entry_array.items[i]);
		break;

	case OBJECT_TAG:
		object->tag.seqnum = le64toh(object->tag.seqnum);
		object->tag.epoch = le64toh(object->tag.epoch);
		break;

	default:
		/* XXX: should probably just ignore unknown types instead,
		 * but the idea here is to let callers safely assume loaded objects
		 * have been fully validated and byteswapped as needed.
		 */
		assert(0);
	}

	return thunk_end(thunk_dispatch(closure));
}


/* Queue IO on iou for loading an entire object of size *size from *journal @ offset *offset, into *object
 * which must already be allocated.
 * Registers closure for dispatch on the io when completed.
 *
 * Note this doesn't allocate space for the object and requires the size be already known, it is the bare
 * minimum object loading into pre-allocated space when the size is known, which performs the necessary
 * le64toh() swapping of object-specific members before calling the supplied closure.
 *
 * It's expected that the caller must already retrieve the object's header in a separate step before
 * calling this to load the entirety of the object, since the header is needed first to know the size
 * for allocating the full object and then loading its contents.  Another heavier helper will be provided
 * for doing both the header load followed by the entire object load in one convenient step.
 */
THUNK_DEFINE(journal_get_object, iou_t *, iou, journal_t **, journal, uint64_t *, offset, uint64_t *, size, Object **, object, thunk_t *, closure)
{
	assert(iou);
	assert(journal);
	assert(offset);
	assert(size);
	assert(object && *object);
	assert(closure);

	return	journal_read(iou, *journal, *offset, *size, *object, THUNK(
			got_object(iou, *size, *object, closure)));
}


THUNK_DEFINE_STATIC(get_object_full_got_header, iou_t *, iou, journal_t **, journal, uint64_t *, offset, ObjectHeader *, object_header, Object **, object, thunk_t *, closure)
{
	Object	*o;

	o = malloc(object_header->size);
	if (!o)
		return -ENOMEM;

	*object = o;

	return journal_get_object(iou, journal, offset, &object_header->size, object, closure);
}


/* Queue IO on iou for loading an object header into *object_header, which must already be allocated,
 * registering a closure to then allocate space for the full object @ *object and queueing IO for loading
 * the full object into that space, with closure registered for dispatch once the full object is loaded.
 *
 * This will leave a newly allocated and populated object @ *object, ready for use.
 */
THUNK_DEFINE(journal_get_object_full, iou_t *, iou, journal_t **, journal, uint64_t *, offset, ObjectHeader *, object_header, Object **, object, thunk_t *, closure)
{
	return	journal_get_object_header(iou, journal, offset, object_header, THUNK(
			get_object_full_got_header(iou, journal, offset, object_header, object, closure)));
}


/* for every open journal in *journals, store the journal in *journal_iter and dispatch closure */
/* closure must expect to be dispatched multiple times; once per journal, and will be freed once at end */
THUNK_DEFINE(journals_for_each, journals_t **, journals, journal_t **, journal_iter, thunk_t *, closure)
{
	journals_t	*j;

	assert(journals && *journals);
	assert(journal_iter);
	assert(closure);

	j = *journals;

	for (size_t i = 0; i < j->n_journals; i++) {
		int	r;

		if (j->journals[i].fd < 0)
			continue;

		(*journal_iter) = &j->journals[i];

		r = thunk_dispatch(closure);
		if (r < 0)
			return r;
	}

	thunk_free(closure);

	return 0;
}


const char * journal_object_type_str(ObjectType type)
{
	static const char *names[] = {
		"UNUSED",
		"Data",
		"Field",
		"Entry",
		"DataHashTable",
		"FieldHashTable",
		"EntryArray",
		"Tag",
	};

	if (type < 0 || type >= sizeof(names) / sizeof(*names))
		return "UNKNOWN";

	return names[type];
}


const char * journal_state_str(JournalState state)
{
	static const char *names[] = {
		"Offline",
		"Online",
		"Archived",
	};

	if (state < 0 || state >= sizeof(names) / sizeof(*names))
		return "UNKNOWN";

	return names[state];
}
© All Rights Reserved