diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/til_stream.c | 232 | ||||
-rw-r--r-- | src/til_stream.h | 11 |
2 files changed, 237 insertions, 6 deletions
diff --git a/src/til_stream.c b/src/til_stream.c index afc635b..e04f807 100644 --- a/src/til_stream.c +++ b/src/til_stream.c @@ -22,6 +22,8 @@ #include <stdio.h> #include <stdlib.h> +#include "til.h" +#include "til_jenkins.h" #include "til_stream.h" #include "til_tap.h" @@ -66,7 +68,8 @@ foo_render(foo_t *foo, til_fb_fragment_t *fragment) #endif -#define TIL_STREAM_BUCKETS_COUNT 256 +#define TIL_STREAM_PIPE_BUCKETS_COUNT 256 +#define TIL_STREAM_CTXT_BUCKETS_COUNT 64 struct til_stream_pipe_t { @@ -78,11 +81,20 @@ struct til_stream_pipe_t { uint32_t hash; /* hash of (driving_tap->name_hash ^ .parent_hash) */ }; +struct til_stream_module_context_t { + til_stream_module_context_t *next; + uint32_t path_hash; + + size_t n_module_contexts; + til_module_context_t *module_contexts[]; +}; + typedef struct til_stream_t { pthread_mutex_t mutex; const til_stream_hooks_t *hooks; void *hooks_context; - til_stream_pipe_t *pipe_buckets[TIL_STREAM_BUCKETS_COUNT]; + til_stream_pipe_t *pipe_buckets[TIL_STREAM_PIPE_BUCKETS_COUNT]; + til_stream_module_context_t *module_context_buckets[TIL_STREAM_CTXT_BUCKETS_COUNT]; } til_stream_t; @@ -105,7 +117,7 @@ til_stream_t * til_stream_free(til_stream_t *stream) if (!stream) return NULL; - for (int i = 0; i < TIL_STREAM_BUCKETS_COUNT; i++) { + for (int i = 0; i < TIL_STREAM_PIPE_BUCKETS_COUNT; i++) { for (til_stream_pipe_t *p = stream->pipe_buckets[i], *p_next; p != NULL; p = p_next) { p_next = p->next; free(p->parent_path); @@ -190,7 +202,7 @@ int til_stream_tap(til_stream_t *stream, const void *owner, const void *owner_fo pthread_mutex_lock(&stream->mutex); hash = (tap->name_hash ^ parent_hash); - bucket = hash % TIL_STREAM_BUCKETS_COUNT; + bucket = hash % TIL_STREAM_PIPE_BUCKETS_COUNT; for (pipe = stream->pipe_buckets[bucket]; pipe != NULL; pipe = pipe->next) { if (pipe->hash == hash) { if (pipe->driving_tap == tap) { @@ -260,7 +272,7 @@ int til_stream_tap(til_stream_t *stream, const void *owner, const void *owner_fo /* remove all pipes belonging to owner in stream, including pipes whose driving_tap is owned by owner */ void til_stream_untap_owner(til_stream_t *stream, const void *owner) { - for (int i = 0; i < TIL_STREAM_BUCKETS_COUNT; i++) { + for (int i = 0; i < TIL_STREAM_PIPE_BUCKETS_COUNT; i++) { for (til_stream_pipe_t *p = stream->pipe_buckets[i], *p_next, *p_prev; p != NULL; p = p_next) { p_next = p->next; @@ -436,7 +448,7 @@ int til_stream_for_each_pipe(til_stream_t *stream, til_stream_pipe_iter_func_t p pthread_mutex_lock(&stream->mutex); - for (int i = 0; i < TIL_STREAM_BUCKETS_COUNT; i++) { + for (int i = 0; i < TIL_STREAM_PIPE_BUCKETS_COUNT; i++) { for (til_stream_pipe_t *p = stream->pipe_buckets[i]; p != NULL; p = p->next) { int r; @@ -472,3 +484,211 @@ void til_stream_pipe_set_driving_tap(til_stream_pipe_t *pipe, const til_tap_t *d pipe->driving_tap = driving_tap; } + + +int til_stream_for_each_module_context(til_stream_t *stream, til_stream_module_context_iter_func_t module_context_cb, void *cb_arg) +{ + assert(stream); + assert(module_context_cb); + + for (int i = 0; i < TIL_STREAM_CTXT_BUCKETS_COUNT; i++) { + for (til_stream_module_context_t *c = stream->module_context_buckets[i]; c != NULL; c = c->next) { + int r; + + r = module_context_cb(cb_arg, c, c->n_module_contexts, (const til_module_context_t **)c->module_contexts); + if (r < 0) + return r; + } + } + + return 0; +} + + +int til_stream_register_module_contexts(til_stream_t *stream, size_t n_contexts, til_module_context_t **contexts) +{ + uint32_t path_hash, bucket; + til_stream_module_context_t *c, *c_next, *c_prev; + + assert(stream); + assert(n_contexts > 0); + assert(contexts); + + path_hash = contexts[0]->setup->path_hash; + bucket = path_hash % TIL_STREAM_CTXT_BUCKETS_COUNT; + for (c = stream->module_context_buckets[bucket], c_prev = NULL; c != NULL; c = c_next) { + c_next = c->next; + + if (c->path_hash != path_hash || + strcmp(contexts[0]->setup->path, c->module_contexts[0]->setup->path)) { + c_prev = c; + continue; + } + + /* due to the current approach of async gc for on-stream contexts, collisions at + * the same path are to be expected - and we just replace what's there. In the + * future there will likely be a context invalidation api for marking what's replaced + * as stale, so the next time that reference gets used it's refreshed via the stream... TODO + */ + + if (c_prev) + c_prev->next = c_next; + else + stream->module_context_buckets[bucket] = c_next; + + /* we're only potentially reusing the stream_module_context_t container, the actual contexts are unreferenced */ + for (size_t i = 0; i < c->n_module_contexts; i++) + c->module_contexts[i] = til_module_context_free((til_module_context_t *)c->module_contexts[i]); + + /* not big enough, FIXME TODO I think the context sets @path are being deprecated */ + if (c->n_module_contexts < n_contexts) { + free(c); + c = NULL; + } + + break; + } + + if (!c) { + c = calloc(1, sizeof(til_stream_module_context_t) + n_contexts * sizeof(contexts[0])); + if (!c) + return -ENOMEM; + + c->path_hash = path_hash; + c->n_module_contexts = n_contexts; + } + + for (size_t i = 0; i < n_contexts; i++) + c->module_contexts[i] = til_module_context_ref(contexts[i]); + + c->next = stream->module_context_buckets[bucket]; + stream->module_context_buckets[bucket] = c; + + return 0; +} + + +int til_stream_find_module_contexts(til_stream_t *stream, const char *path, size_t n_contexts, til_module_context_t **res_contexts) +{ + uint32_t path_hash, bucket; + til_stream_module_context_t *c; + + assert(stream); + assert(path); + assert(n_contexts > 0); + assert(res_contexts); + + /* TODO: add a til_path_t which pairs the hash and char* so we do't keep recomputing it */ + path_hash = til_jenkins((uint8_t *)path, strlen(path) + 1); + bucket = path_hash % TIL_STREAM_CTXT_BUCKETS_COUNT; + for (c = stream->module_context_buckets[bucket]; c != NULL; c = c->next) { + if (c->path_hash != path_hash) + continue; + + if (!strcmp(c->module_contexts[0]->setup->path, path)) { + /* FIXME TODO: n_contexts mismatches need to be dealt with still: + * + * When n_contexts is smaller than the registered set, it seems trivial to + * just use the subset - but then the subset marches ahead of the others and + * should arguably invalidate the rest, truncating the set. The complexity there + * is all the outstanding references need to realize their invalidated and refresh + * their contexts, presumably via this lookup in the stream. + * + * When n_contexts exceeds the registered set, there's a need to create more but + * they need to be identical, which suggests there needs to be + * til_module_t.clone_context() + * + * It should be noted that there's just one user of cloned contexts as a means of + * achieving parallel rendering: checkers::fill_module, if that usage is deprecated + * this all goes away. But the way checkers is achieving parallelism there, regardless + * of what the underlying fill_module is capable of, is arguably a powerful thing, for + * use cases that need instancing of the same module's output. Though maybe it makes + * more sense for checkers to be rendering _one_ tile, then copying it to the filled + * cells in parallel - if they're all expected to be identical, why not render it once? + */ + + /* XXX: until the above is addressed, this assert will remain */ + assert(n_contexts <= c->n_module_contexts); + for (size_t i = 0; i < n_contexts; i++) + res_contexts[i] = til_module_context_ref(c->module_contexts[i]); + + return 0; + } + } + + return -ENOENT; +} + + +void til_stream_gc_module_contexts(til_stream_t *stream) +{ + assert(stream); + + /* This may not remain long-term, but there's no current way to unregister contexts - and by + * occasionally calling this at appropriate times, long-running modules potentially reusing + * contexts by path on streams like rkt can cleanup contexts rather than keep leaking them + * since they're automatically registered on-stream. + * + * But the way the "ref" builtin currently looks up contexts late @ render time presents some + * difficulty as to "when is the appropriate time to gc contexts?". rtv for example is calling + * it immediately after creating the context for the next channel, it's just that rtv doesn't + * actually make use of context passing or anything like that via "ref" so it doesn't really + * matter - rtv just wants to release the resources. For rkt, it gets more complicated, and + * it's unclear yet what the right solution will be there. So consider this "gc" implementation + * very tentative and mostly just to keep "rtv" from meltdown while the dust settles on the rkt + * front. + */ + for (size_t b = 0; b < TIL_STREAM_CTXT_BUCKETS_COUNT; b++) { + til_stream_module_context_t *c, *c_prev, *c_next; + + for (c = stream->module_context_buckets[b], c_prev = NULL; c != NULL; c = c_next) { + size_t i; + + c_next = c->next; + + for (i = 0; i < c->n_module_contexts; i++) { + if (c->module_contexts[i]->refcount != 1) + break; + } + + if (i < c->n_module_contexts) { + c_prev = c; + continue; + } + + /* if all the contexts in the set are rc=1, that implies they're only on-stream + * and not actively referenced by any user, so we "gc" them. + */ + { /* free(stream_module_context_t) */ + for (i = 0; i < c->n_module_contexts; i++) + c->module_contexts[i] = til_module_context_free(c->module_contexts[i]); + free(c); + } + + if (c_prev) + c_prev->next = c_next; + else + stream->module_context_buckets[b] = c_next; + } + } +} + + +static int til_stream_fprint_module_context_cb(void *arg, til_stream_module_context_t *module_context, size_t n_module_contexts, const til_module_context_t **contexts) +{ + FILE *out = arg; + + fprintf(out, " %s: %s[%zu]", contexts[0]->setup->path, contexts[0]->module->name, n_module_contexts); + for (size_t i = 0; i < n_module_contexts; i++) + fprintf(out, "%s{rc=%u, n_cpus=%u}", i ? ", " : " ", contexts[i]->refcount, contexts[i]->n_cpus); + fprintf(out, "\n"); +} + + +void til_stream_fprint_module_contexts(til_stream_t *stream, FILE *out) +{ + /* TODO: errors */ + fprintf(out, "Module contexts on stream %p:\n", stream); + (void) til_stream_for_each_module_context(stream, til_stream_fprint_module_context_cb, out); + fprintf(out, "\n"); +} diff --git a/src/til_stream.h b/src/til_stream.h index 1fb709f..cb3c98e 100644 --- a/src/til_stream.h +++ b/src/til_stream.h @@ -24,6 +24,7 @@ #include "til_setup.h" typedef struct til_stream_t til_stream_t; +typedef struct til_stream_module_context_t til_stream_module_context_t; typedef struct til_stream_pipe_t til_stream_pipe_t; typedef struct til_tap_t til_tap_t; @@ -63,4 +64,14 @@ int til_stream_for_each_pipe(til_stream_t *stream, til_stream_pipe_iter_func_t p void til_stream_pipe_set_owner(til_stream_pipe_t *pipe, const void *owner, const void *owner_foo); void til_stream_pipe_set_driving_tap(til_stream_pipe_t *pipe, const til_tap_t *driving_tap); + +typedef int (til_stream_module_context_iter_func_t)(void *context, til_stream_module_context_t *module_context, size_t n_module_contexts, const til_module_context_t **contexts); + +int til_stream_for_each_module_context(til_stream_t *stream, til_stream_module_context_iter_func_t module_context_cb, void *cb_arg); + +int til_stream_register_module_contexts(til_stream_t *stream, size_t n_contexts, til_module_context_t **contexts); +int til_stream_find_module_contexts(til_stream_t *stream, const char *path, size_t n_contexts, til_module_context_t **res_contexts); +void til_stream_gc_module_contexts(til_stream_t *stream); +void til_stream_fprint_module_contexts(til_stream_t *stream, FILE *out); + #endif |