Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input: fix crash caused by incorrectly initializing input coroutine params. #7982

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 14 additions & 24 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,31 +456,31 @@ struct flb_input_coro *flb_input_coro_create(struct flb_input_instance *ins,
return input_coro;
}

struct flb_libco_in_params {
struct flb_in_collect_params {
struct flb_config *config;
struct flb_input_collector *coll;
struct flb_coro *coro;
};

extern pthread_key_t libco_in_param_key;
extern struct flb_libco_in_params libco_in_param;
extern FLB_TLS_DEFINE(struct flb_in_collect_params, in_collect_params);

void flb_input_coro_prepare_destroy(struct flb_input_coro *input_coro);

static FLB_INLINE void input_params_set(struct flb_coro *coro,
struct flb_input_collector *coll,
struct flb_config *config,
void *context)
{
struct flb_libco_in_params *params;
struct flb_in_collect_params *params;

params = pthread_getspecific(libco_in_param_key);
params = (struct flb_in_collect_params *) FLB_TLS_GET(in_collect_params);
if (params == NULL) {
params = flb_calloc(1, sizeof(struct flb_libco_in_params));
params = flb_calloc(1, sizeof(struct flb_in_collect_params));
if (params == NULL) {
flb_errno();
return;
}
pthread_setspecific(libco_in_param_key, params);
FLB_TLS_SET(in_collect_params, params);
}

/* Set callback parameters */
Expand All @@ -495,16 +495,12 @@ static FLB_INLINE void input_pre_cb_collect(void)
struct flb_input_collector *coll;
struct flb_config *config;
struct flb_coro *coro;
struct flb_libco_in_params *params;
struct flb_in_collect_params *params;

params = pthread_getspecific(libco_in_param_key);
params = (struct flb_in_collect_params *)FLB_TLS_GET(in_collect_params);
if (params == NULL) {
params = flb_calloc(1, sizeof(struct flb_libco_in_params));
if (params == NULL) {
flb_errno();
return;
}
pthread_setspecific(libco_in_param_key, params);
flb_errno();
return;
}
coll = params->coll;
config = params->config;
Expand All @@ -519,13 +515,6 @@ static FLB_INLINE void flb_input_coro_resume(struct flb_input_coro *input_coro)
flb_coro_resume(input_coro->coro);
}

static void libco_in_param_key_destroy(void *data)
{
struct flb_libco_inparams *params = (struct flb_libco_inparams*)data;

flb_free(params);
}

static FLB_INLINE
struct flb_input_coro *flb_input_coro_collect(struct flb_input_collector *coll,
struct flb_config *config)
Expand All @@ -539,8 +528,6 @@ struct flb_input_coro *flb_input_coro_collect(struct flb_input_collector *coll,
return NULL;
}

pthread_key_create(&libco_in_param_key, libco_in_param_key_destroy);

coro = input_coro->coro;
if (!coro) {
return NULL;
Expand Down Expand Up @@ -648,6 +635,9 @@ static inline int flb_input_config_map_set(struct flb_input_instance *ins,
return ret;
}

void flb_input_init();
void flb_input_exit();

int flb_input_register_all(struct flb_config *config);
struct flb_input_instance *flb_input_new(struct flb_config *config,
const char *input, void *data,
Expand Down
23 changes: 21 additions & 2 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@
#include <fluent-bit/flb_chunk_trace.h>
#endif /* FLB_HAVE_CHUNK_TRACE */

struct flb_libco_in_params libco_in_param;
pthread_key_t libco_in_param_key;
FLB_TLS_DEFINE(struct flb_in_collect_params, in_collect_params);

#define protcmp(a, b) strncasecmp(a, b, strlen(a))

Expand Down Expand Up @@ -141,6 +140,24 @@ int flb_input_log_check(struct flb_input_instance *ins, int l)
return FLB_TRUE;
}

/* Prepare input co-routines for the thread. */
void flb_input_init()
{
FLB_TLS_INIT(in_collect_params);
FLB_TLS_SET(in_collect_params, NULL);
}

void flb_input_exit()
{
struct flb_in_collect_params *params;

params = (struct flb_in_collect_params *)FLB_TLS_GET(in_collect_params);
if (params) {
flb_free(params);
FLB_TLS_SET(in_collect_params, NULL);
}
}

/* Create an input plugin instance */
struct flb_input_instance *flb_input_new(struct flb_config *config,
const char *input, void *data,
Expand Down Expand Up @@ -1313,6 +1330,8 @@ void flb_input_exit_all(struct flb_config *config)
/* destroy the instance */
flb_input_instance_destroy(ins);
}

flb_input_exit();
}

/* Check that at least one Input is enabled */
Expand Down
1 change: 1 addition & 0 deletions src/flb_input_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ static void input_thread(void *data)
flb_bucket_queue_destroy(evl_bktq);
flb_sched_destroy(sched);
input_thread_instance_destroy(thi);
flb_input_exit();
}


Expand Down
1 change: 1 addition & 0 deletions src/flb_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ void flb_init_env()
flb_upstream_init();
flb_downstream_init();
flb_output_prepare();
flb_input_init();

FLB_TLS_INIT(flb_lib_active_context);
FLB_TLS_INIT(flb_lib_active_cf_context);
Expand Down
1 change: 1 addition & 0 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params);
void flb_output_prepare()
{
FLB_TLS_INIT(out_flush_params);
FLB_TLS_SET(out_flush_params, NULL);
}

/* Validate the the output address protocol */
Expand Down
5 changes: 3 additions & 2 deletions tests/runtime/custom_calyptia_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ void flb_custom_calyptia_pipeline_config_get_test()
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

flb_input_init();
flb_output_prepare();

in_ffd_dummy = flb_input(ctx, (char *) "dummy", NULL);
TEST_CHECK(in_ffd_dummy >= 0);

Expand Down Expand Up @@ -46,8 +49,6 @@ void flb_custom_calyptia_pipeline_config_get_test()
cfg = custom_calyptia_pipeline_config_get(ctx->config);
TEST_CHECK(strcmp(cfg, cfg_str) == 0);

// fix a thread local storage bug on macos
flb_output_prepare();
flb_sds_destroy(cfg);
flb_destroy(ctx);
}
Expand Down