From 6132ad8fb8050bbbea18716f420412efebe32eb8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 26 Sep 2023 15:43:27 -0700 Subject: [PATCH] output_thread: add support for scheduled timer jobs with coroutines Signed-off-by: Wesley Pettit --- include/fluent-bit/flb_output_thread.h | 10 +++- src/flb_output_thread.c | 66 +++++++++++++++++++++++--- 2 files changed, 68 insertions(+), 8 deletions(-) diff --git a/include/fluent-bit/flb_output_thread.h b/include/fluent-bit/flb_output_thread.h index 4100c9c1a7e..38a59276025 100644 --- a/include/fluent-bit/flb_output_thread.h +++ b/include/fluent-bit/flb_output_thread.h @@ -86,7 +86,12 @@ struct flb_out_thread_instance { * 'flushes' running by a threaded instance, then the access to the 'flush_list' * must be protected: we use 'flush_mutex for that purpose. */ - pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */ + pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */ + + /* Same as flush_mutex but for timer coros */ + struct mk_list timer_coro_list; /* flush context list */ + struct mk_list timer_coro_list_destroy; /* flust context destroy list */ + pthread_mutex_t timer_mutex; /* mutex for 'flush_list' */ /* List of mapped 'upstream' contexts */ struct mk_list upstreams; @@ -100,7 +105,8 @@ int flb_output_thread_pool_start(struct flb_output_instance *ins); int flb_output_thread_pool_flush(struct flb_task *task, struct flb_output_instance *out_ins, struct flb_config *config); - +int flb_output_thread_pool_timer_coros_size(struct flb_output_instance *ins); +void flb_output_thread_pool_timer_coros_print(struct flb_output_instance *ins); void flb_output_thread_instance_init(); struct flb_out_thread_instance *flb_output_thread_instance_get(); diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index 52d1f57959c..4ef3ef50253 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -180,7 +180,8 @@ static void output_thread(void *data) struct flb_output_instance *ins; struct flb_output_flush *out_flush; struct flb_out_thread_instance *th_ins = data; - struct flb_out_flush_params *params; + struct flb_out_flush_params *flush_params = NULL; + struct flb_out_timer_coro_params *timer_params = NULL; struct flb_net_dns dns_ctx; /* Register thread instance */ @@ -333,7 +334,7 @@ static void output_thread(void *data) flb_sched_timer_cleanup(sched); /* Check if we should stop the event loop */ - if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0) { + if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0 && mk_list_size(&th_ins->timer_coro_list) == 0) { /* * If there are no busy network connections (and no coroutines) its * safe to stop it. @@ -359,9 +360,13 @@ static void output_thread(void *data) flb_upstream_conn_pending_destroy_list(&th_ins->upstreams); flb_sched_destroy(sched); - params = FLB_TLS_GET(out_flush_params); - if (params) { - flb_free(params); + flush_params = FLB_TLS_GET(out_flush_params); + if (flush_params) { + flb_free(flush_params); + } + timer_params = FLB_TLS_GET(timer_coro_params); + if (timer_params) { + flb_free(timer_params); } mk_event_loop_destroy(th_ins->evl); flb_bucket_queue_destroy(th_ins->evl_bktq); @@ -437,7 +442,10 @@ int flb_output_thread_pool_create(struct flb_config *config, th_ins->flush_id = 0; mk_list_init(&th_ins->flush_list); mk_list_init(&th_ins->flush_list_destroy); + mk_list_init(&th_ins->timer_coro_list); + mk_list_init(&th_ins->timer_coro_list_destroy); pthread_mutex_init(&th_ins->flush_mutex, NULL); + pthread_mutex_init(&th_ins->timer_mutex, NULL); mk_list_init(&th_ins->upstreams); upstream_thread_create(th_ins, ins); @@ -495,6 +503,53 @@ int flb_output_thread_pool_create(struct flb_config *config, return 0; } +int flb_output_thread_pool_timer_coros_size(struct flb_output_instance *ins) +{ + int n; + int size = 0; + struct mk_list *head; + struct flb_tp *tp = ins->tp; + struct flb_tp_thread *th; + struct flb_out_thread_instance *th_ins; + + mk_list_foreach(head, &tp->list_threads) { + th = mk_list_entry(head, struct flb_tp_thread, _head); + if (th->status != FLB_THREAD_POOL_RUNNING) { + continue; + } + + th_ins = th->params.data; + + pthread_mutex_lock(&th_ins->flush_mutex); + n = mk_list_size(&th_ins->timer_coro_list); + pthread_mutex_unlock(&th_ins->flush_mutex); + size += n; + } + + return size; +} + +void flb_output_thread_pool_timer_coros_print(struct flb_output_instance *ins) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_tp *tp = ins->tp; + struct flb_tp_thread *th; + struct flb_out_thread_instance *th_ins; + + mk_list_foreach_safe(head, tmp, &tp->list_threads) { + th = mk_list_entry(head, struct flb_tp_thread, _head); + if (th->status != FLB_THREAD_POOL_RUNNING) { + continue; + } + + th_ins = th->params.data; + pthread_mutex_lock(&th_ins->timer_mutex); + flb_timer_coros_print(&th_ins->timer_coro_list); + pthread_mutex_unlock(&th_ins->timer_mutex); + } +} + int flb_output_thread_pool_coros_size(struct flb_output_instance *ins) { int n; @@ -504,7 +559,6 @@ int flb_output_thread_pool_coros_size(struct flb_output_instance *ins) struct flb_tp_thread *th; struct flb_out_thread_instance *th_ins; - /* Signal each worker thread that needs to stop doing work */ mk_list_foreach(head, &tp->list_threads) { th = mk_list_entry(head, struct flb_tp_thread, _head); if (th->status != FLB_THREAD_POOL_RUNNING) {