Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reload: bin: filter: handle invalid values of configuration correctly on hot reloading #8110

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct flb_filter_plugin {
struct flb_config_map *config_map;

/* Callbacks */
int (*cb_pre_run) (struct flb_filter_instance *, struct flb_config *, void *);
int (*cb_init) (struct flb_filter_instance *, struct flb_config *, void *);
int (*cb_filter) (const void *, size_t,
const char *, int,
Expand Down
8 changes: 6 additions & 2 deletions include/fluent-bit/flb_reload.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_config_format.h>

#define FLB_RELOAD_IDLE 0
#define FLB_RELOAD_IN_PROGRESS 1
#define FLB_RELOAD_IDLE 0
#define FLB_RELOAD_IN_PROGRESS 1
#define FLB_RELOAD_ABORTED -1
#define FLB_RELOAD_HALTED -2
#define FLB_RELOAD_NOT_ENABLED -3
#define FLB_RELOAD_INVALID_CONTEXT -4

int flb_reload_property_check_all(struct flb_config *config);
int flb_reload_reconstruct_cf(struct flb_cf *src_cf, struct flb_cf *dest_cf);
Expand Down
42 changes: 42 additions & 0 deletions plugins/filter_lua/lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,46 @@
#include "lua_config.h"
#include "mpack/mpack.h"

static int cb_lua_pre_run(struct flb_filter_instance *f_ins,
struct flb_config *config, void *data)
{
int ret;
(void) data;
struct lua_filter *ctx;
struct flb_luajit *lj;

/* Create context */
ctx = lua_config_create(f_ins, config);
if (!ctx) {
flb_error("[filter_lua] filter cannot be loaded");
return -1;
}

/* Create LuaJIT state/vm */
lj = flb_luajit_create(config);
if (!lj) {
lua_config_destroy(ctx);
return -1;
}
ctx->lua = lj;

/* Lua script source code */
if (ctx->code) {
ret = flb_luajit_load_buffer(ctx->lua,
ctx->code, flb_sds_len(ctx->code),
"fluentbit.lua");
}
else {
/* Load Script / file path*/
ret = flb_luajit_load_script(ctx->lua, ctx->script);
}

flb_luajit_destroy(ctx->lua);
lua_config_destroy(ctx);

return ret;
}

static int cb_lua_init(struct flb_filter_instance *f_ins,
struct flb_config *config,
void *data)
Expand Down Expand Up @@ -77,6 +117,7 @@ static int cb_lua_init(struct flb_filter_instance *f_ins,
}

if (ret == -1) {
flb_luajit_destroy(ctx->lua);
lua_config_destroy(ctx);
return -1;
}
Expand Down Expand Up @@ -701,6 +742,7 @@ static struct flb_config_map config_map[] = {
struct flb_filter_plugin filter_lua_plugin = {
.name = "lua",
.description = "Lua Scripting Filter",
.cb_pre_run = cb_lua_pre_run,
.cb_init = cb_lua_init,
#ifdef FLB_FILTER_LUA_USE_MPACK
.cb_filter = cb_lua_filter_mpack,
Expand Down
36 changes: 36 additions & 0 deletions plugins/filter_wasm/filter_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,41 @@ static void delete_wasm_config(struct flb_filter_wasm *ctx)
flb_free(ctx);
}

/* Check existence of wasm program binary */
static int cb_wasm_pre_run(struct flb_filter_instance *f_ins,
struct flb_config *config, void *data)
{
struct flb_filter_wasm *ctx = NULL;
int ret = -1;

/* Allocate space for the configuration */
ctx = flb_calloc(1, sizeof(struct flb_filter_wasm));
if (!ctx) {
return -1;
}

/* Initialize exec config */
ret = filter_wasm_config_read(ctx, f_ins, config);
if (ret < 0) {
goto pre_run_error;
}

/* Check accessibility for the wasm path */
ret = access(ctx->wasm_path, R_OK);
if (ret != 0) {
goto pre_run_error;
}

delete_wasm_config(ctx);

return 0;

pre_run_error:
delete_wasm_config(ctx);

return -1;
}

/* Initialize plugin */
static int cb_wasm_init(struct flb_filter_instance *f_ins,
struct flb_config *config, void *data)
Expand Down Expand Up @@ -311,6 +346,7 @@ static struct flb_config_map config_map[] = {
struct flb_filter_plugin filter_wasm_plugin = {
.name = "wasm",
.description = "WASM program filter",
.cb_pre_run = cb_wasm_pre_run,
.cb_init = cb_wasm_init,
.cb_filter = cb_wasm_filter,
.cb_exit = cb_wasm_exit,
Expand Down
9 changes: 9 additions & 0 deletions src/flb_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,15 @@ int flb_filter_init(struct flb_config *config, struct flb_filter_instance *ins)
return 0;
}

/* Run pre_run callback for the filter */
if (p->cb_pre_run) {
ret = p->cb_pre_run(ins, config, ins->data);
if (ret != 0) {
flb_error("Failed pre_run callback on filter %s", ins->name);
return -1;
}
}

/* Initialize the input */
if (p->cb_init) {
ret = p->cb_init(ins, config, ins->data);
Expand Down
50 changes: 35 additions & 15 deletions src/flb_reload.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin.h>
#include <fluent-bit/flb_reload.h>

#include <cfl/cfl.h>
#include <cfl/cfl_sds.h>
Expand Down Expand Up @@ -128,6 +129,7 @@ static int flb_filter_propery_check_all(struct flb_config *config)
struct mk_list *tmp;
struct mk_list *head;
struct flb_filter_instance *ins;
struct flb_filter_plugin *p;

/* Iterate all active input instance plugins */
mk_list_foreach_safe(head, tmp, &config->filters) {
Expand All @@ -145,6 +147,17 @@ static int flb_filter_propery_check_all(struct flb_config *config)
return -1;
}

/* Check actual values with additional validator */
p = ins->p;
/* Run pre_run callback for the filter */
if (p->cb_pre_run) {
ret = p->cb_pre_run(ins, config, ins->data);
if (ret != 0) {
flb_error("Failed pre_run callback on filter %s", ins->name);
return -1;
}
}

/* destroy config map (will be recreated at flb_start) */
if (ins->config_map) {
flb_config_map_destroy(ins->config_map);
Expand Down Expand Up @@ -205,7 +218,7 @@ int flb_reload_property_check_all(struct flb_config *config)
/* Check properties of filter plugins */
ret = flb_filter_propery_check_all(config);
if (ret == -1) {
flb_error("[reload] check properties for filter plugins is failed");
flb_error("[reload] check properties and additonal vaildations for filter plugins is failed");

return -1;
}
Expand Down Expand Up @@ -371,13 +384,13 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)

if (ctx == NULL) {
flb_error("[reload] given flb context is NULL");
return -2;
return FLB_RELOAD_INVALID_CONTEXT;
}

old_config = ctx->config;
if (old_config->enable_hot_reload != FLB_TRUE) {
flb_warn("[reload] hot reloading is not enabled");
return -3;
return FLB_RELOAD_NOT_ENABLED;
}

if (old_config->ensure_thread_safety_on_hot_reloading) {
Expand All @@ -391,7 +404,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)
*/
new_cf = flb_cf_create();
if (!new_cf) {
return -1;
return FLB_RELOAD_HALTED;
}

flb_info("reloading instance pid=%lu tid=%p", (long unsigned) getpid(), pthread_self());
Expand All @@ -406,7 +419,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)
}
flb_cf_destroy(new_cf);
flb_error("[reload] reconstruct cf failed");
return -1;
return FLB_RELOAD_HALTED;
}
}

Expand All @@ -419,7 +432,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)
flb_cf_destroy(new_cf);
flb_error("[reload] creating flb context is failed. Reloading is halted");

return -1;
return FLB_RELOAD_HALTED;
}

new_config = new_ctx->config;
Expand All @@ -446,7 +459,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)
if (!new_cf) {
flb_sds_destroy(file);

return -1;
return FLB_RELOAD_HALTED;
}
}

Expand All @@ -462,7 +475,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)
flb_destroy(new_ctx);
flb_error("[reload] reloaded config is invalid. Reloading is halted");

return -1;
return FLB_RELOAD_HALTED;
}
}

Expand All @@ -476,7 +489,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)

flb_error("[reload] reloaded config format is invalid. Reloading is halted");

return -1;
return FLB_RELOAD_HALTED;
}

/* Validate plugin properites before fluent-bit stops the old context. */
Expand All @@ -489,7 +502,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)

flb_error("[reload] reloaded config is invalid. Reloading is halted");

return -1;
return FLB_RELOAD_HALTED;
}

/* Delete the original context of config format before replacing
Expand All @@ -512,12 +525,19 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)

ret = flb_start(new_ctx);

/* Store the new value of hot reloading times into the new context */
if (ret == 0) {
new_config->hot_reloaded_count = reloaded_count;
flb_debug("[reload] hot reloaded %d time(s)", reloaded_count);
new_config->hot_reloading = FLB_FALSE;
if (ret != 0) {
flb_stop(new_ctx);
flb_destroy(new_ctx);

flb_error("[reload] loaded configuration contains error(s). Reloading is aborted");

return FLB_RELOAD_ABORTED;
}

/* Store the new value of hot reloading times into the new context */
new_config->hot_reloaded_count = reloaded_count;
flb_debug("[reload] hot reloaded %d time(s)", reloaded_count);
new_config->hot_reloading = FLB_FALSE;

return 0;
}
27 changes: 22 additions & 5 deletions src/fluent-bit.c
Original file line number Diff line number Diff line change
Expand Up @@ -1394,16 +1394,28 @@ int flb_main(int argc, char **argv)
#endif
if (flb_bin_restarting == FLB_RELOAD_IN_PROGRESS) {
/* reload by using same config files/path */
flb_reload(ctx, cf_opts);
ctx = flb_context_get();
ret = flb_reload(ctx, cf_opts);
if (ret == 0) {
ctx = flb_context_get();
flb_bin_restarting = FLB_RELOAD_IDLE;
}
else {
flb_bin_restarting = ret;
}
}

if (flb_bin_restarting == FLB_RELOAD_HALTED) {
sleep(1);
flb_bin_restarting = FLB_RELOAD_IDLE;
}
}

if (exit_signal) {
flb_signal_exit(exit_signal);
}
ret = ctx->config->exit_status_code;
if (flb_bin_restarting != FLB_RELOAD_ABORTED) {
ret = ctx->config->exit_status_code;
}

cf_opts = flb_cf_context_get();

Expand All @@ -1425,8 +1437,13 @@ int flb_main(int argc, char **argv)
}
#endif

flb_stop(ctx);
flb_destroy(ctx);
if (flb_bin_restarting == FLB_RELOAD_ABORTED) {
fprintf(stderr, "reloading is aborted and exit\n");
}
else {
flb_stop(ctx);
flb_destroy(ctx);
}

return ret;
}
Expand Down
Loading