Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin_proxy: core: expose more callbacks for golang plugins #7997

Merged
merged 10 commits into from
Oct 16, 2023
Merged
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ struct flb_config {
int enable_hot_reload;
int ensure_thread_safety_on_hot_reloading;
unsigned int hot_reloaded_count;
int shutdown_by_hot_reloading;
int hot_reloading;

/* Co-routines */
unsigned int coro_stack_size;
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,8 @@ void *flb_output_get_cmt_instance(struct flb_output_instance *ins);
#endif
void flb_output_net_default(const char *host, int port,
struct flb_output_instance *ins);
int flb_output_enable_multi_threading(struct flb_output_instance *ins,
struct flb_config *config);
const char *flb_output_name(struct flb_output_instance *ins);
void flb_output_pre_run(struct flb_config *config);
void flb_output_exit(struct flb_config *config);
Expand Down
4 changes: 0 additions & 4 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ struct flb_plugin_input_proxy_context {
void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
const char *symbol);

int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config);

int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
struct flb_config *config);

Expand Down
2 changes: 2 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ struct flb_config *flb_config_init()
/* reload */
config->ensure_thread_safety_on_hot_reloading = FLB_TRUE;
config->hot_reloaded_count = 0;
config->shutdown_by_hot_reloading = FLB_FALSE;
config->hot_reloading = FLB_FALSE;

#ifdef FLB_HAVE_SQLDB
mk_list_init(&config->sqldb_list);
Expand Down
21 changes: 0 additions & 21 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -1220,27 +1220,6 @@ int flb_output_init_all(struct flb_config *config)
}
#endif

#ifdef FLB_HAVE_PROXY_GO
/* Proxy plugins have their own initialization */
if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
ret = flb_plugin_proxy_output_init(p->proxy, ins, config);
if (ret == -1) {
flb_output_instance_destroy(ins);
return -1;
}

/* Multi-threading enabled if configured */
ret = flb_output_enable_multi_threading(ins, config);
if (ret == -1) {
flb_error("[output] could not start thread pool for '%s' plugin",
p->name);
return -1;
}

continue;
}
#endif

#ifdef FLB_HAVE_TLS
if (ins->use_tls == FLB_TRUE) {
ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
Expand Down
126 changes: 118 additions & 8 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk,
(void) i_ins;
(void) config;

/* To prevent flush callback executions, we need to check the
* status of hot-reloading. The actual problem is: we don't have
* pause procedure/mechanism for output plugin. For now, we just halt the
* flush callback here during hot-reloading is in progress. */
if (config->shutdown_by_hot_reloading == FLB_TRUE) {
flb_trace("[GO] hot-reloading is in progress. Retry flushing");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

#ifdef FLB_HAVE_PROXY_GO
if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
Expand Down Expand Up @@ -173,13 +181,31 @@ static int flb_proxy_input_cb_init(struct flb_input_instance *ins,
static void flb_proxy_input_cb_pause(void *data, struct flb_config *config)
{
struct flb_plugin_input_proxy_context *ctx = data;
struct flb_plugin_proxy *proxy = (ctx->proxy);

/* pause */
void (*cb_pause)(void);

cb_pause = flb_plugin_proxy_symbol(proxy, "FLBPluginInputPause");
if (cb_pause != NULL) {
cb_pause();
}

flb_input_collector_pause(ctx->coll_fd, ctx->proxy->instance);
}

static void flb_proxy_input_cb_resume(void *data, struct flb_config *config)
{
struct flb_plugin_input_proxy_context *ctx = data;
struct flb_plugin_proxy *proxy = (ctx->proxy);

/* resume */
void (*cb_resume)(void);

cb_resume = flb_plugin_proxy_symbol(proxy, "FLBPluginInputResume");
if (cb_resume != NULL) {
cb_resume();
}

flb_input_collector_resume(ctx->coll_fd, ctx->proxy->instance);
}
Expand All @@ -190,11 +216,18 @@ static int flb_proxy_output_cb_exit(void *out_context, struct flb_config *config
{
struct flb_plugin_proxy_context *ctx = out_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);
/* pre_exit (Golang plugin only) */
void (*cb_pre_exit)(int);

if (!out_context) {
return 0;
}

cb_pre_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginOutputPreExit");
if (cb_pre_exit != NULL) {
cb_pre_exit(config->shutdown_by_hot_reloading);
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
proxy_go_output_destroy(ctx);
Expand Down Expand Up @@ -229,11 +262,18 @@ static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config)
{
struct flb_plugin_input_proxy_context *ctx = in_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);
/* pre_exit (Golang plugin only) */
void (*cb_pre_exit)(int);

if (!in_context) {
return 0;
}

cb_pre_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginInputPreExit");
if (cb_pre_exit != NULL) {
cb_pre_exit(config->shutdown_by_hot_reloading);
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
proxy_go_input_destroy(ctx);
Expand Down Expand Up @@ -264,6 +304,51 @@ static void flb_proxy_input_cb_destroy(struct flb_input_plugin *plugin)
flb_plugin_proxy_destroy(proxy);
}

static int flb_proxy_input_cb_pre_run(struct flb_input_instance *ins,
struct flb_config *config, void *data)
{
struct flb_plugin_proxy_context *pc;
struct flb_plugin_proxy *proxy;
int ret;

pc = (struct flb_plugin_proxy_context *)(ins->context);
proxy = pc->proxy;

/* pre_run */
int (*cb_pre_run)(int);

cb_pre_run = flb_plugin_proxy_symbol(proxy, "FLBPluginInputPreRun");
if (cb_pre_run != NULL) {
ret = cb_pre_run(config->enable_hot_reload);
}

return ret;
}

static int flb_proxy_output_cb_pre_run(void *out_context, struct flb_config *config)
{
struct flb_plugin_proxy_context *ctx = out_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);
int ret;

if (!out_context) {
return 0;
}

/* pre_run */
int (*cb_pre_run)(int);

cb_pre_run = flb_plugin_proxy_symbol(proxy, "FLBPluginOutputPreRun");
if (cb_pre_run != NULL) {
ret = cb_pre_run(config->enable_hot_reload);
}

return ret;
}

int flb_proxy_output_cb_init(struct flb_output_instance *o_ins,
struct flb_config *config, void *data);

static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def,
struct flb_config *config)
Expand All @@ -280,7 +365,7 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
out->type = FLB_OUTPUT_PLUGIN_PROXY;
out->proxy = proxy;
out->flags = def->flags;
out->name = def->name;
out->name = flb_strdup(def->name);
out->description = def->description;
mk_list_add(&out->_head, &config->out_plugins);

Expand All @@ -289,7 +374,9 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
* the core plugins specs, have a different callback approach, so
* we put our proxy-middle callbacks to do the translation properly.
*/
out->cb_init = flb_proxy_output_cb_init;
out->cb_flush = proxy_cb_flush;
out->cb_pre_run = flb_proxy_output_cb_pre_run;
out->cb_exit = flb_proxy_output_cb_exit;
out->cb_destroy = flb_proxy_output_cb_destroy;
return 0;
Expand Down Expand Up @@ -320,6 +407,7 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
* the core plugins specs, have a different callback approach, so
* we put our proxy-middle callbacks to do the translation properly.
*/
in->cb_pre_run = flb_proxy_input_cb_pre_run;
in->cb_init = flb_proxy_input_cb_init;
in->cb_collect = flb_proxy_input_cb_collect;
in->cb_flush_buf = NULL;
Expand Down Expand Up @@ -348,8 +436,19 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
{
int ret;
int (*cb_register)(struct flb_plugin_proxy_def *);
int (*cb_pre_register)(int);
struct flb_plugin_proxy_def *def = proxy->def;

/* Lookup the pre registration callback */
cb_pre_register = flb_plugin_proxy_symbol(proxy, "FLBPluginPreRegister");
if (cb_pre_register != NULL) {
/* Prepare the registration if available */
ret = cb_pre_register(config->hot_reloading);
if (ret == -1) {
return -1;
}
}

/* Lookup the registration callback */
cb_register = flb_plugin_proxy_symbol(proxy, "FLBPluginRegister");
if (!cb_register) {
Expand Down Expand Up @@ -403,24 +502,35 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
return 0;
}

int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy,
struct flb_output_instance *o_ins,
struct flb_config *config)
int flb_proxy_output_cb_init(struct flb_output_instance *o_ins,
struct flb_config *config, void *data)
{
int ret = -1;
struct flb_plugin_proxy_context *pc;

/* Before to initialize for proxy, set the proxy instance reference */
pc = (struct flb_plugin_proxy_context *)(o_ins->context);

/* Before to initialize, set the instance reference */
proxy->instance = o_ins;
pc->proxy->instance = o_ins;

/* Based on 'proxy', use the proper handler */
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
if (pc->proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_output_init(proxy);
ret = proxy_go_output_init(pc->proxy);
#endif
}
else {
flb_error("[proxy] unrecognized proxy handler %i",
proxy->def->proxy);
pc->proxy->def->proxy);
}

/* Multi-threading enabled if configured */
ret = flb_output_enable_multi_threading(o_ins, config);
if (ret == -1) {
flb_error("[output] could not start thread pool for '%s' plugin",
o_ins->p->name);
return -1;
}

return ret;
Expand Down
5 changes: 5 additions & 0 deletions src/flb_reload.c
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)
new_config->verbose = verbose;
/* Increment and store the number of hot reloaded times */
reloaded_count = ctx->config->hot_reloaded_count + 1;
/* Mark shutdown reason as hot_reloading */
ctx->config->shutdown_by_hot_reloading = FLB_TRUE;
/* Mark hot reloading */
new_config->hot_reloading = FLB_TRUE;

#ifdef FLB_HAVE_STREAM_PROCESSOR
/* Inherit stream processor definitions from command line */
Expand Down Expand Up @@ -511,6 +515,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)
if (ret == 0) {
new_config->hot_reloaded_count = reloaded_count;
flb_debug("[reload] hot reloaded %d time(s)", reloaded_count);
new_config->hot_reloading = FLB_FALSE;
}

return 0;
Expand Down
Loading