diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index a5955908b88..f2392901db8 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -261,6 +261,8 @@ struct flb_config { int enable_hot_reload; int ensure_thread_safety_on_hot_reloading; unsigned int hot_reloaded_count; + int shutdown_by_hot_reloading; + int hot_reloading; /* Co-routines */ unsigned int coro_stack_size; diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 661054b7db4..d69de4f9192 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -1075,6 +1075,8 @@ void *flb_output_get_cmt_instance(struct flb_output_instance *ins); #endif void flb_output_net_default(const char *host, int port, struct flb_output_instance *ins); +int flb_output_enable_multi_threading(struct flb_output_instance *ins, + struct flb_config *config); const char *flb_output_name(struct flb_output_instance *ins); void flb_output_pre_run(struct flb_config *config); void flb_output_exit(struct flb_config *config); diff --git a/include/fluent-bit/flb_plugin_proxy.h b/include/fluent-bit/flb_plugin_proxy.h index 081821417f9..ad2f379a3d5 100644 --- a/include/fluent-bit/flb_plugin_proxy.h +++ b/include/fluent-bit/flb_plugin_proxy.h @@ -71,10 +71,6 @@ struct flb_plugin_input_proxy_context { void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy, const char *symbol); -int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy, - struct flb_output_instance *o_ins, - struct flb_config *config); - int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, struct flb_config *config); diff --git a/src/flb_config.c b/src/flb_config.c index 882a93c7c1c..ae19fcbf7ec 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -281,6 +281,8 @@ struct flb_config *flb_config_init() /* reload */ config->ensure_thread_safety_on_hot_reloading = FLB_TRUE; config->hot_reloaded_count = 0; + config->shutdown_by_hot_reloading = FLB_FALSE; + config->hot_reloading = FLB_FALSE; #ifdef FLB_HAVE_SQLDB mk_list_init(&config->sqldb_list); diff --git a/src/flb_output.c b/src/flb_output.c index b1548f60dfa..64756c090e4 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -1220,27 +1220,6 @@ int flb_output_init_all(struct flb_config *config) } #endif -#ifdef FLB_HAVE_PROXY_GO - /* Proxy plugins have their own initialization */ - if (p->type == FLB_OUTPUT_PLUGIN_PROXY) { - ret = flb_plugin_proxy_output_init(p->proxy, ins, config); - if (ret == -1) { - flb_output_instance_destroy(ins); - return -1; - } - - /* Multi-threading enabled if configured */ - ret = flb_output_enable_multi_threading(ins, config); - if (ret == -1) { - flb_error("[output] could not start thread pool for '%s' plugin", - p->name); - return -1; - } - - continue; - } -#endif - #ifdef FLB_HAVE_TLS if (ins->use_tls == FLB_TRUE) { ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE, diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c index 440c545256c..5acba5e7078 100644 --- a/src/flb_plugin_proxy.c +++ b/src/flb_plugin_proxy.c @@ -50,6 +50,14 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk, (void) i_ins; (void) config; + /* To prevent flush callback executions, we need to check the + * status of hot-reloading. The actual problem is: we don't have + * pause procedure/mechanism for output plugin. For now, we just halt the + * flush callback here during hot-reloading is in progress. */ + if (config->shutdown_by_hot_reloading == FLB_TRUE) { + flb_trace("[GO] hot-reloading is in progress. Retry flushing"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } #ifdef FLB_HAVE_PROXY_GO if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { @@ -173,6 +181,15 @@ static int flb_proxy_input_cb_init(struct flb_input_instance *ins, static void flb_proxy_input_cb_pause(void *data, struct flb_config *config) { struct flb_plugin_input_proxy_context *ctx = data; + struct flb_plugin_proxy *proxy = (ctx->proxy); + + /* pause */ + void (*cb_pause)(void); + + cb_pause = flb_plugin_proxy_symbol(proxy, "FLBPluginInputPause"); + if (cb_pause != NULL) { + cb_pause(); + } flb_input_collector_pause(ctx->coll_fd, ctx->proxy->instance); } @@ -180,6 +197,15 @@ static void flb_proxy_input_cb_pause(void *data, struct flb_config *config) static void flb_proxy_input_cb_resume(void *data, struct flb_config *config) { struct flb_plugin_input_proxy_context *ctx = data; + struct flb_plugin_proxy *proxy = (ctx->proxy); + + /* resume */ + void (*cb_resume)(void); + + cb_resume = flb_plugin_proxy_symbol(proxy, "FLBPluginInputResume"); + if (cb_resume != NULL) { + cb_resume(); + } flb_input_collector_resume(ctx->coll_fd, ctx->proxy->instance); } @@ -190,11 +216,18 @@ static int flb_proxy_output_cb_exit(void *out_context, struct flb_config *config { struct flb_plugin_proxy_context *ctx = out_context; struct flb_plugin_proxy *proxy = (ctx->proxy); + /* pre_exit (Golang plugin only) */ + void (*cb_pre_exit)(int); if (!out_context) { return 0; } + cb_pre_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginOutputPreExit"); + if (cb_pre_exit != NULL) { + cb_pre_exit(config->shutdown_by_hot_reloading); + } + if (proxy->def->proxy == FLB_PROXY_GOLANG) { #ifdef FLB_HAVE_PROXY_GO proxy_go_output_destroy(ctx); @@ -229,11 +262,18 @@ static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config) { struct flb_plugin_input_proxy_context *ctx = in_context; struct flb_plugin_proxy *proxy = (ctx->proxy); + /* pre_exit (Golang plugin only) */ + void (*cb_pre_exit)(int); if (!in_context) { return 0; } + cb_pre_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginInputPreExit"); + if (cb_pre_exit != NULL) { + cb_pre_exit(config->shutdown_by_hot_reloading); + } + if (proxy->def->proxy == FLB_PROXY_GOLANG) { #ifdef FLB_HAVE_PROXY_GO proxy_go_input_destroy(ctx); @@ -264,6 +304,51 @@ static void flb_proxy_input_cb_destroy(struct flb_input_plugin *plugin) flb_plugin_proxy_destroy(proxy); } +static int flb_proxy_input_cb_pre_run(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_plugin_proxy_context *pc; + struct flb_plugin_proxy *proxy; + int ret; + + pc = (struct flb_plugin_proxy_context *)(ins->context); + proxy = pc->proxy; + + /* pre_run */ + int (*cb_pre_run)(int); + + cb_pre_run = flb_plugin_proxy_symbol(proxy, "FLBPluginInputPreRun"); + if (cb_pre_run != NULL) { + ret = cb_pre_run(config->enable_hot_reload); + } + + return ret; +} + +static int flb_proxy_output_cb_pre_run(void *out_context, struct flb_config *config) +{ + struct flb_plugin_proxy_context *ctx = out_context; + struct flb_plugin_proxy *proxy = (ctx->proxy); + int ret; + + if (!out_context) { + return 0; + } + + /* pre_run */ + int (*cb_pre_run)(int); + + cb_pre_run = flb_plugin_proxy_symbol(proxy, "FLBPluginOutputPreRun"); + if (cb_pre_run != NULL) { + ret = cb_pre_run(config->enable_hot_reload); + } + + return ret; +} + +int flb_proxy_output_cb_init(struct flb_output_instance *o_ins, + struct flb_config *config, void *data); + static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def, struct flb_config *config) @@ -280,7 +365,7 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, out->type = FLB_OUTPUT_PLUGIN_PROXY; out->proxy = proxy; out->flags = def->flags; - out->name = def->name; + out->name = flb_strdup(def->name); out->description = def->description; mk_list_add(&out->_head, &config->out_plugins); @@ -289,7 +374,9 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy, * the core plugins specs, have a different callback approach, so * we put our proxy-middle callbacks to do the translation properly. */ + out->cb_init = flb_proxy_output_cb_init; out->cb_flush = proxy_cb_flush; + out->cb_pre_run = flb_proxy_output_cb_pre_run; out->cb_exit = flb_proxy_output_cb_exit; out->cb_destroy = flb_proxy_output_cb_destroy; return 0; @@ -320,6 +407,7 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy, * the core plugins specs, have a different callback approach, so * we put our proxy-middle callbacks to do the translation properly. */ + in->cb_pre_run = flb_proxy_input_cb_pre_run; in->cb_init = flb_proxy_input_cb_init; in->cb_collect = flb_proxy_input_cb_collect; in->cb_flush_buf = NULL; @@ -348,8 +436,19 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, { int ret; int (*cb_register)(struct flb_plugin_proxy_def *); + int (*cb_pre_register)(int); struct flb_plugin_proxy_def *def = proxy->def; + /* Lookup the pre registration callback */ + cb_pre_register = flb_plugin_proxy_symbol(proxy, "FLBPluginPreRegister"); + if (cb_pre_register != NULL) { + /* Prepare the registration if available */ + ret = cb_pre_register(config->hot_reloading); + if (ret == -1) { + return -1; + } + } + /* Lookup the registration callback */ cb_register = flb_plugin_proxy_symbol(proxy, "FLBPluginRegister"); if (!cb_register) { @@ -403,24 +502,35 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, return 0; } -int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy, - struct flb_output_instance *o_ins, - struct flb_config *config) +int flb_proxy_output_cb_init(struct flb_output_instance *o_ins, + struct flb_config *config, void *data) { int ret = -1; + struct flb_plugin_proxy_context *pc; + + /* Before to initialize for proxy, set the proxy instance reference */ + pc = (struct flb_plugin_proxy_context *)(o_ins->context); /* Before to initialize, set the instance reference */ - proxy->instance = o_ins; + pc->proxy->instance = o_ins; /* Based on 'proxy', use the proper handler */ - if (proxy->def->proxy == FLB_PROXY_GOLANG) { + if (pc->proxy->def->proxy == FLB_PROXY_GOLANG) { #ifdef FLB_HAVE_PROXY_GO - ret = proxy_go_output_init(proxy); + ret = proxy_go_output_init(pc->proxy); #endif } else { flb_error("[proxy] unrecognized proxy handler %i", - proxy->def->proxy); + pc->proxy->def->proxy); + } + + /* Multi-threading enabled if configured */ + ret = flb_output_enable_multi_threading(o_ins, config); + if (ret == -1) { + flb_error("[output] could not start thread pool for '%s' plugin", + o_ins->p->name); + return -1; } return ret; diff --git a/src/flb_reload.c b/src/flb_reload.c index 641fa4a66d0..6c0e7c6ce90 100644 --- a/src/flb_reload.c +++ b/src/flb_reload.c @@ -428,6 +428,10 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) new_config->verbose = verbose; /* Increment and store the number of hot reloaded times */ reloaded_count = ctx->config->hot_reloaded_count + 1; + /* Mark shutdown reason as hot_reloading */ + ctx->config->shutdown_by_hot_reloading = FLB_TRUE; + /* Mark hot reloading */ + new_config->hot_reloading = FLB_TRUE; #ifdef FLB_HAVE_STREAM_PROCESSOR /* Inherit stream processor definitions from command line */ @@ -511,6 +515,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) if (ret == 0) { new_config->hot_reloaded_count = reloaded_count; flb_debug("[reload] hot reloaded %d time(s)", reloaded_count); + new_config->hot_reloading = FLB_FALSE; } return 0;