diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 1b4dff82254..27ae9bf9860 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -456,14 +456,14 @@ struct flb_input_coro *flb_input_coro_create(struct flb_input_instance *ins, return input_coro; } -struct flb_libco_in_params { +struct flb_in_collect_params { struct flb_config *config; struct flb_input_collector *coll; struct flb_coro *coro; }; -extern pthread_key_t libco_in_param_key; -extern struct flb_libco_in_params libco_in_param; +extern FLB_TLS_DEFINE(struct flb_in_collect_params, in_collect_params); + void flb_input_coro_prepare_destroy(struct flb_input_coro *input_coro); static FLB_INLINE void input_params_set(struct flb_coro *coro, @@ -471,16 +471,16 @@ static FLB_INLINE void input_params_set(struct flb_coro *coro, struct flb_config *config, void *context) { - struct flb_libco_in_params *params; + struct flb_in_collect_params *params; - params = pthread_getspecific(libco_in_param_key); + params = (struct flb_in_collect_params *) FLB_TLS_GET(in_collect_params); if (params == NULL) { - params = flb_calloc(1, sizeof(struct flb_libco_in_params)); + params = flb_calloc(1, sizeof(struct flb_in_collect_params)); if (params == NULL) { flb_errno(); return; } - pthread_setspecific(libco_in_param_key, params); + FLB_TLS_SET(in_collect_params, params); } /* Set callback parameters */ @@ -495,16 +495,12 @@ static FLB_INLINE void input_pre_cb_collect(void) struct flb_input_collector *coll; struct flb_config *config; struct flb_coro *coro; - struct flb_libco_in_params *params; + struct flb_in_collect_params *params; - params = pthread_getspecific(libco_in_param_key); + params = (struct flb_in_collect_params *)FLB_TLS_GET(in_collect_params); if (params == NULL) { - params = flb_calloc(1, sizeof(struct flb_libco_in_params)); - if (params == NULL) { - flb_errno(); - return; - } - pthread_setspecific(libco_in_param_key, params); + flb_errno(); + return; } coll = params->coll; config = params->config; @@ -519,13 +515,6 @@ static FLB_INLINE void flb_input_coro_resume(struct flb_input_coro *input_coro) flb_coro_resume(input_coro->coro); } -static void libco_in_param_key_destroy(void *data) -{ - struct flb_libco_inparams *params = (struct flb_libco_inparams*)data; - - flb_free(params); -} - static FLB_INLINE struct flb_input_coro *flb_input_coro_collect(struct flb_input_collector *coll, struct flb_config *config) @@ -539,8 +528,6 @@ struct flb_input_coro *flb_input_coro_collect(struct flb_input_collector *coll, return NULL; } - pthread_key_create(&libco_in_param_key, libco_in_param_key_destroy); - coro = input_coro->coro; if (!coro) { return NULL; @@ -648,6 +635,9 @@ static inline int flb_input_config_map_set(struct flb_input_instance *ins, return ret; } +void flb_input_init(); +void flb_input_exit(); + int flb_input_register_all(struct flb_config *config); struct flb_input_instance *flb_input_new(struct flb_config *config, const char *input, void *data, diff --git a/src/flb_input.c b/src/flb_input.c index 1c4faad4d3f..6c77cac6c6c 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -49,8 +49,7 @@ #include #endif /* FLB_HAVE_CHUNK_TRACE */ -struct flb_libco_in_params libco_in_param; -pthread_key_t libco_in_param_key; +FLB_TLS_DEFINE(struct flb_in_collect_params, in_collect_params); #define protcmp(a, b) strncasecmp(a, b, strlen(a)) @@ -141,6 +140,24 @@ int flb_input_log_check(struct flb_input_instance *ins, int l) return FLB_TRUE; } +/* Prepare input co-routines for the thread. */ +void flb_input_init() +{ + FLB_TLS_INIT(in_collect_params); + FLB_TLS_SET(in_collect_params, NULL); +} + +void flb_input_exit() +{ + struct flb_in_collect_params *params; + + params = (struct flb_in_collect_params *)FLB_TLS_GET(in_collect_params); + if (params) { + flb_free(params); + FLB_TLS_SET(in_collect_params, NULL); + } +} + /* Create an input plugin instance */ struct flb_input_instance *flb_input_new(struct flb_config *config, const char *input, void *data, @@ -1313,6 +1330,8 @@ void flb_input_exit_all(struct flb_config *config) /* destroy the instance */ flb_input_instance_destroy(ins); } + + flb_input_exit(); } /* Check that at least one Input is enabled */ diff --git a/src/flb_input_thread.c b/src/flb_input_thread.c index bf073296de2..2d656d41c20 100644 --- a/src/flb_input_thread.c +++ b/src/flb_input_thread.c @@ -459,6 +459,7 @@ static void input_thread(void *data) flb_bucket_queue_destroy(evl_bktq); flb_sched_destroy(sched); input_thread_instance_destroy(thi); + flb_input_exit(); } diff --git a/src/flb_lib.c b/src/flb_lib.c index 882faa54757..bf69a39eb3f 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -127,6 +127,7 @@ void flb_init_env() flb_upstream_init(); flb_downstream_init(); flb_output_prepare(); + flb_input_init(); FLB_TLS_INIT(flb_lib_active_context); FLB_TLS_INIT(flb_lib_active_cf_context); diff --git a/src/flb_output.c b/src/flb_output.c index b1548f60dfa..cbc39f74512 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -45,6 +45,7 @@ FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params); void flb_output_prepare() { FLB_TLS_INIT(out_flush_params); + FLB_TLS_SET(out_flush_params, NULL); } /* Validate the the output address protocol */ diff --git a/tests/runtime/custom_calyptia_test.c b/tests/runtime/custom_calyptia_test.c index 5ad30e7a0bf..101b1386c76 100644 --- a/tests/runtime/custom_calyptia_test.c +++ b/tests/runtime/custom_calyptia_test.c @@ -19,6 +19,9 @@ void flb_custom_calyptia_pipeline_config_get_test() ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + flb_input_init(); + flb_output_prepare(); + in_ffd_dummy = flb_input(ctx, (char *) "dummy", NULL); TEST_CHECK(in_ffd_dummy >= 0); @@ -46,8 +49,6 @@ void flb_custom_calyptia_pipeline_config_get_test() cfg = custom_calyptia_pipeline_config_get(ctx->config); TEST_CHECK(strcmp(cfg, cfg_str) == 0); - // fix a thread local storage bug on macos - flb_output_prepare(); flb_sds_destroy(cfg); flb_destroy(ctx); }