summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/til_stream.c232
-rw-r--r--src/til_stream.h11
2 files changed, 237 insertions, 6 deletions
diff --git a/src/til_stream.c b/src/til_stream.c
index afc635b..e04f807 100644
--- a/src/til_stream.c
+++ b/src/til_stream.c
@@ -22,6 +22,8 @@
#include <stdio.h>
#include <stdlib.h>
+#include "til.h"
+#include "til_jenkins.h"
#include "til_stream.h"
#include "til_tap.h"
@@ -66,7 +68,8 @@ foo_render(foo_t *foo, til_fb_fragment_t *fragment)
#endif
-#define TIL_STREAM_BUCKETS_COUNT 256
+#define TIL_STREAM_PIPE_BUCKETS_COUNT 256
+#define TIL_STREAM_CTXT_BUCKETS_COUNT 64
struct til_stream_pipe_t {
@@ -78,11 +81,20 @@ struct til_stream_pipe_t {
uint32_t hash; /* hash of (driving_tap->name_hash ^ .parent_hash) */
};
+struct til_stream_module_context_t {
+ til_stream_module_context_t *next;
+ uint32_t path_hash;
+
+ size_t n_module_contexts;
+ til_module_context_t *module_contexts[];
+};
+
typedef struct til_stream_t {
pthread_mutex_t mutex;
const til_stream_hooks_t *hooks;
void *hooks_context;
- til_stream_pipe_t *pipe_buckets[TIL_STREAM_BUCKETS_COUNT];
+ til_stream_pipe_t *pipe_buckets[TIL_STREAM_PIPE_BUCKETS_COUNT];
+ til_stream_module_context_t *module_context_buckets[TIL_STREAM_CTXT_BUCKETS_COUNT];
} til_stream_t;
@@ -105,7 +117,7 @@ til_stream_t * til_stream_free(til_stream_t *stream)
if (!stream)
return NULL;
- for (int i = 0; i < TIL_STREAM_BUCKETS_COUNT; i++) {
+ for (int i = 0; i < TIL_STREAM_PIPE_BUCKETS_COUNT; i++) {
for (til_stream_pipe_t *p = stream->pipe_buckets[i], *p_next; p != NULL; p = p_next) {
p_next = p->next;
free(p->parent_path);
@@ -190,7 +202,7 @@ int til_stream_tap(til_stream_t *stream, const void *owner, const void *owner_fo
pthread_mutex_lock(&stream->mutex);
hash = (tap->name_hash ^ parent_hash);
- bucket = hash % TIL_STREAM_BUCKETS_COUNT;
+ bucket = hash % TIL_STREAM_PIPE_BUCKETS_COUNT;
for (pipe = stream->pipe_buckets[bucket]; pipe != NULL; pipe = pipe->next) {
if (pipe->hash == hash) {
if (pipe->driving_tap == tap) {
@@ -260,7 +272,7 @@ int til_stream_tap(til_stream_t *stream, const void *owner, const void *owner_fo
/* remove all pipes belonging to owner in stream, including pipes whose driving_tap is owned by owner */
void til_stream_untap_owner(til_stream_t *stream, const void *owner)
{
- for (int i = 0; i < TIL_STREAM_BUCKETS_COUNT; i++) {
+ for (int i = 0; i < TIL_STREAM_PIPE_BUCKETS_COUNT; i++) {
for (til_stream_pipe_t *p = stream->pipe_buckets[i], *p_next, *p_prev; p != NULL; p = p_next) {
p_next = p->next;
@@ -436,7 +448,7 @@ int til_stream_for_each_pipe(til_stream_t *stream, til_stream_pipe_iter_func_t p
pthread_mutex_lock(&stream->mutex);
- for (int i = 0; i < TIL_STREAM_BUCKETS_COUNT; i++) {
+ for (int i = 0; i < TIL_STREAM_PIPE_BUCKETS_COUNT; i++) {
for (til_stream_pipe_t *p = stream->pipe_buckets[i]; p != NULL; p = p->next) {
int r;
@@ -472,3 +484,211 @@ void til_stream_pipe_set_driving_tap(til_stream_pipe_t *pipe, const til_tap_t *d
pipe->driving_tap = driving_tap;
}
+
+
+int til_stream_for_each_module_context(til_stream_t *stream, til_stream_module_context_iter_func_t module_context_cb, void *cb_arg)
+{
+ assert(stream);
+ assert(module_context_cb);
+
+ for (int i = 0; i < TIL_STREAM_CTXT_BUCKETS_COUNT; i++) {
+ for (til_stream_module_context_t *c = stream->module_context_buckets[i]; c != NULL; c = c->next) {
+ int r;
+
+ r = module_context_cb(cb_arg, c, c->n_module_contexts, (const til_module_context_t **)c->module_contexts);
+ if (r < 0)
+ return r;
+ }
+ }
+
+ return 0;
+}
+
+
+int til_stream_register_module_contexts(til_stream_t *stream, size_t n_contexts, til_module_context_t **contexts)
+{
+ uint32_t path_hash, bucket;
+ til_stream_module_context_t *c, *c_next, *c_prev;
+
+ assert(stream);
+ assert(n_contexts > 0);
+ assert(contexts);
+
+ path_hash = contexts[0]->setup->path_hash;
+ bucket = path_hash % TIL_STREAM_CTXT_BUCKETS_COUNT;
+ for (c = stream->module_context_buckets[bucket], c_prev = NULL; c != NULL; c = c_next) {
+ c_next = c->next;
+
+ if (c->path_hash != path_hash ||
+ strcmp(contexts[0]->setup->path, c->module_contexts[0]->setup->path)) {
+ c_prev = c;
+ continue;
+ }
+
+ /* due to the current approach of async gc for on-stream contexts, collisions at
+ * the same path are to be expected - and we just replace what's there. In the
+ * future there will likely be a context invalidation api for marking what's replaced
+ * as stale, so the next time that reference gets used it's refreshed via the stream... TODO
+ */
+
+ if (c_prev)
+ c_prev->next = c_next;
+ else
+ stream->module_context_buckets[bucket] = c_next;
+
+ /* we're only potentially reusing the stream_module_context_t container, the actual contexts are unreferenced */
+ for (size_t i = 0; i < c->n_module_contexts; i++)
+ c->module_contexts[i] = til_module_context_free((til_module_context_t *)c->module_contexts[i]);
+
+ /* not big enough, FIXME TODO I think the context sets @path are being deprecated */
+ if (c->n_module_contexts < n_contexts) {
+ free(c);
+ c = NULL;
+ }
+
+ break;
+ }
+
+ if (!c) {
+ c = calloc(1, sizeof(til_stream_module_context_t) + n_contexts * sizeof(contexts[0]));
+ if (!c)
+ return -ENOMEM;
+
+ c->path_hash = path_hash;
+ c->n_module_contexts = n_contexts;
+ }
+
+ for (size_t i = 0; i < n_contexts; i++)
+ c->module_contexts[i] = til_module_context_ref(contexts[i]);
+
+ c->next = stream->module_context_buckets[bucket];
+ stream->module_context_buckets[bucket] = c;
+
+ return 0;
+}
+
+
+int til_stream_find_module_contexts(til_stream_t *stream, const char *path, size_t n_contexts, til_module_context_t **res_contexts)
+{
+ uint32_t path_hash, bucket;
+ til_stream_module_context_t *c;
+
+ assert(stream);
+ assert(path);
+ assert(n_contexts > 0);
+ assert(res_contexts);
+
+ /* TODO: add a til_path_t which pairs the hash and char* so we do't keep recomputing it */
+ path_hash = til_jenkins((uint8_t *)path, strlen(path) + 1);
+ bucket = path_hash % TIL_STREAM_CTXT_BUCKETS_COUNT;
+ for (c = stream->module_context_buckets[bucket]; c != NULL; c = c->next) {
+ if (c->path_hash != path_hash)
+ continue;
+
+ if (!strcmp(c->module_contexts[0]->setup->path, path)) {
+ /* FIXME TODO: n_contexts mismatches need to be dealt with still:
+ *
+ * When n_contexts is smaller than the registered set, it seems trivial to
+ * just use the subset - but then the subset marches ahead of the others and
+ * should arguably invalidate the rest, truncating the set. The complexity there
+ * is all the outstanding references need to realize their invalidated and refresh
+ * their contexts, presumably via this lookup in the stream.
+ *
+ * When n_contexts exceeds the registered set, there's a need to create more but
+ * they need to be identical, which suggests there needs to be
+ * til_module_t.clone_context()
+ *
+ * It should be noted that there's just one user of cloned contexts as a means of
+ * achieving parallel rendering: checkers::fill_module, if that usage is deprecated
+ * this all goes away. But the way checkers is achieving parallelism there, regardless
+ * of what the underlying fill_module is capable of, is arguably a powerful thing, for
+ * use cases that need instancing of the same module's output. Though maybe it makes
+ * more sense for checkers to be rendering _one_ tile, then copying it to the filled
+ * cells in parallel - if they're all expected to be identical, why not render it once?
+ */
+
+ /* XXX: until the above is addressed, this assert will remain */
+ assert(n_contexts <= c->n_module_contexts);
+ for (size_t i = 0; i < n_contexts; i++)
+ res_contexts[i] = til_module_context_ref(c->module_contexts[i]);
+
+ return 0;
+ }
+ }
+
+ return -ENOENT;
+}
+
+
+void til_stream_gc_module_contexts(til_stream_t *stream)
+{
+ assert(stream);
+
+ /* This may not remain long-term, but there's no current way to unregister contexts - and by
+ * occasionally calling this at appropriate times, long-running modules potentially reusing
+ * contexts by path on streams like rkt can cleanup contexts rather than keep leaking them
+ * since they're automatically registered on-stream.
+ *
+ * But the way the "ref" builtin currently looks up contexts late @ render time presents some
+ * difficulty as to "when is the appropriate time to gc contexts?". rtv for example is calling
+ * it immediately after creating the context for the next channel, it's just that rtv doesn't
+ * actually make use of context passing or anything like that via "ref" so it doesn't really
+ * matter - rtv just wants to release the resources. For rkt, it gets more complicated, and
+ * it's unclear yet what the right solution will be there. So consider this "gc" implementation
+ * very tentative and mostly just to keep "rtv" from meltdown while the dust settles on the rkt
+ * front.
+ */
+ for (size_t b = 0; b < TIL_STREAM_CTXT_BUCKETS_COUNT; b++) {
+ til_stream_module_context_t *c, *c_prev, *c_next;
+
+ for (c = stream->module_context_buckets[b], c_prev = NULL; c != NULL; c = c_next) {
+ size_t i;
+
+ c_next = c->next;
+
+ for (i = 0; i < c->n_module_contexts; i++) {
+ if (c->module_contexts[i]->refcount != 1)
+ break;
+ }
+
+ if (i < c->n_module_contexts) {
+ c_prev = c;
+ continue;
+ }
+
+ /* if all the contexts in the set are rc=1, that implies they're only on-stream
+ * and not actively referenced by any user, so we "gc" them.
+ */
+ { /* free(stream_module_context_t) */
+ for (i = 0; i < c->n_module_contexts; i++)
+ c->module_contexts[i] = til_module_context_free(c->module_contexts[i]);
+ free(c);
+ }
+
+ if (c_prev)
+ c_prev->next = c_next;
+ else
+ stream->module_context_buckets[b] = c_next;
+ }
+ }
+}
+
+
+static int til_stream_fprint_module_context_cb(void *arg, til_stream_module_context_t *module_context, size_t n_module_contexts, const til_module_context_t **contexts)
+{
+ FILE *out = arg;
+
+ fprintf(out, " %s: %s[%zu]", contexts[0]->setup->path, contexts[0]->module->name, n_module_contexts);
+ for (size_t i = 0; i < n_module_contexts; i++)
+ fprintf(out, "%s{rc=%u, n_cpus=%u}", i ? ", " : " ", contexts[i]->refcount, contexts[i]->n_cpus);
+ fprintf(out, "\n");
+}
+
+
+void til_stream_fprint_module_contexts(til_stream_t *stream, FILE *out)
+{
+ /* TODO: errors */
+ fprintf(out, "Module contexts on stream %p:\n", stream);
+ (void) til_stream_for_each_module_context(stream, til_stream_fprint_module_context_cb, out);
+ fprintf(out, "\n");
+}
diff --git a/src/til_stream.h b/src/til_stream.h
index 1fb709f..cb3c98e 100644
--- a/src/til_stream.h
+++ b/src/til_stream.h
@@ -24,6 +24,7 @@
#include "til_setup.h"
typedef struct til_stream_t til_stream_t;
+typedef struct til_stream_module_context_t til_stream_module_context_t;
typedef struct til_stream_pipe_t til_stream_pipe_t;
typedef struct til_tap_t til_tap_t;
@@ -63,4 +64,14 @@ int til_stream_for_each_pipe(til_stream_t *stream, til_stream_pipe_iter_func_t p
void til_stream_pipe_set_owner(til_stream_pipe_t *pipe, const void *owner, const void *owner_foo);
void til_stream_pipe_set_driving_tap(til_stream_pipe_t *pipe, const til_tap_t *driving_tap);
+
+typedef int (til_stream_module_context_iter_func_t)(void *context, til_stream_module_context_t *module_context, size_t n_module_contexts, const til_module_context_t **contexts);
+
+int til_stream_for_each_module_context(til_stream_t *stream, til_stream_module_context_iter_func_t module_context_cb, void *cb_arg);
+
+int til_stream_register_module_contexts(til_stream_t *stream, size_t n_contexts, til_module_context_t **contexts);
+int til_stream_find_module_contexts(til_stream_t *stream, const char *path, size_t n_contexts, til_module_context_t **res_contexts);
+void til_stream_gc_module_contexts(til_stream_t *stream);
+void til_stream_fprint_module_contexts(til_stream_t *stream, FILE *out);
+
#endif
© All Rights Reserved