diff --git a/include/fluent-bit/flb_filter.h b/include/fluent-bit/flb_filter.h index 12649073bd5..ef27540a924 100644 --- a/include/fluent-bit/flb_filter.h +++ b/include/fluent-bit/flb_filter.h @@ -64,6 +64,7 @@ struct flb_filter_plugin { struct flb_config_map *config_map; /* Callbacks */ + int (*cb_pre_run) (struct flb_filter_instance *, struct flb_config *, void *); int (*cb_init) (struct flb_filter_instance *, struct flb_config *, void *); int (*cb_filter) (const void *, size_t, const char *, int, diff --git a/include/fluent-bit/flb_reload.h b/include/fluent-bit/flb_reload.h index 7066505a21c..6277add116d 100644 --- a/include/fluent-bit/flb_reload.h +++ b/include/fluent-bit/flb_reload.h @@ -25,8 +25,12 @@ #include #include -#define FLB_RELOAD_IDLE 0 -#define FLB_RELOAD_IN_PROGRESS 1 +#define FLB_RELOAD_IDLE 0 +#define FLB_RELOAD_IN_PROGRESS 1 +#define FLB_RELOAD_ABORTED -1 +#define FLB_RELOAD_HALTED -2 +#define FLB_RELOAD_NOT_ENABLED -3 +#define FLB_RELOAD_INVALID_CONTEXT -4 int flb_reload_property_check_all(struct flb_config *config); int flb_reload_reconstruct_cf(struct flb_cf *src_cf, struct flb_cf *dest_cf); diff --git a/plugins/filter_lua/lua.c b/plugins/filter_lua/lua.c index bb7bb566ab4..a42cca7ca4b 100644 --- a/plugins/filter_lua/lua.c +++ b/plugins/filter_lua/lua.c @@ -36,6 +36,46 @@ #include "lua_config.h" #include "mpack/mpack.h" +static int cb_lua_pre_run(struct flb_filter_instance *f_ins, + struct flb_config *config, void *data) +{ + int ret; + (void) data; + struct lua_filter *ctx; + struct flb_luajit *lj; + + /* Create context */ + ctx = lua_config_create(f_ins, config); + if (!ctx) { + flb_error("[filter_lua] filter cannot be loaded"); + return -1; + } + + /* Create LuaJIT state/vm */ + lj = flb_luajit_create(config); + if (!lj) { + lua_config_destroy(ctx); + return -1; + } + ctx->lua = lj; + + /* Lua script source code */ + if (ctx->code) { + ret = flb_luajit_load_buffer(ctx->lua, + ctx->code, flb_sds_len(ctx->code), + "fluentbit.lua"); + } + else { + /* Load Script / file path*/ + ret = flb_luajit_load_script(ctx->lua, ctx->script); + } + + flb_luajit_destroy(ctx->lua); + lua_config_destroy(ctx); + + return ret; +} + static int cb_lua_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) @@ -77,6 +117,7 @@ static int cb_lua_init(struct flb_filter_instance *f_ins, } if (ret == -1) { + flb_luajit_destroy(ctx->lua); lua_config_destroy(ctx); return -1; } @@ -701,6 +742,7 @@ static struct flb_config_map config_map[] = { struct flb_filter_plugin filter_lua_plugin = { .name = "lua", .description = "Lua Scripting Filter", + .cb_pre_run = cb_lua_pre_run, .cb_init = cb_lua_init, #ifdef FLB_FILTER_LUA_USE_MPACK .cb_filter = cb_lua_filter_mpack, diff --git a/plugins/filter_wasm/filter_wasm.c b/plugins/filter_wasm/filter_wasm.c index be3adccd36e..a982a34bc9a 100644 --- a/plugins/filter_wasm/filter_wasm.c +++ b/plugins/filter_wasm/filter_wasm.c @@ -247,6 +247,41 @@ static void delete_wasm_config(struct flb_filter_wasm *ctx) flb_free(ctx); } +/* Check existence of wasm program binary */ +static int cb_wasm_pre_run(struct flb_filter_instance *f_ins, + struct flb_config *config, void *data) +{ + struct flb_filter_wasm *ctx = NULL; + int ret = -1; + + /* Allocate space for the configuration */ + ctx = flb_calloc(1, sizeof(struct flb_filter_wasm)); + if (!ctx) { + return -1; + } + + /* Initialize exec config */ + ret = filter_wasm_config_read(ctx, f_ins, config); + if (ret < 0) { + goto pre_run_error; + } + + /* Check accessibility for the wasm path */ + ret = access(ctx->wasm_path, R_OK); + if (ret != 0) { + goto pre_run_error; + } + + delete_wasm_config(ctx); + + return 0; + +pre_run_error: + delete_wasm_config(ctx); + + return -1; +} + /* Initialize plugin */ static int cb_wasm_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) @@ -311,6 +346,7 @@ static struct flb_config_map config_map[] = { struct flb_filter_plugin filter_wasm_plugin = { .name = "wasm", .description = "WASM program filter", + .cb_pre_run = cb_wasm_pre_run, .cb_init = cb_wasm_init, .cb_filter = cb_wasm_filter, .cb_exit = cb_wasm_exit, diff --git a/src/flb_filter.c b/src/flb_filter.c index 389709a9a4d..522cadb6aad 100644 --- a/src/flb_filter.c +++ b/src/flb_filter.c @@ -588,6 +588,15 @@ int flb_filter_init(struct flb_config *config, struct flb_filter_instance *ins) return 0; } + /* Run pre_run callback for the filter */ + if (p->cb_pre_run) { + ret = p->cb_pre_run(ins, config, ins->data); + if (ret != 0) { + flb_error("Failed pre_run callback on filter %s", ins->name); + return -1; + } + } + /* Initialize the input */ if (p->cb_init) { ret = p->cb_init(ins, config, ins->data); diff --git a/src/flb_reload.c b/src/flb_reload.c index ea43a3554d4..12055a1dfe6 100644 --- a/src/flb_reload.c +++ b/src/flb_reload.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -128,6 +129,7 @@ static int flb_filter_propery_check_all(struct flb_config *config) struct mk_list *tmp; struct mk_list *head; struct flb_filter_instance *ins; + struct flb_filter_plugin *p; /* Iterate all active input instance plugins */ mk_list_foreach_safe(head, tmp, &config->filters) { @@ -145,6 +147,17 @@ static int flb_filter_propery_check_all(struct flb_config *config) return -1; } + /* Check actual values with additional validator */ + p = ins->p; + /* Run pre_run callback for the filter */ + if (p->cb_pre_run) { + ret = p->cb_pre_run(ins, config, ins->data); + if (ret != 0) { + flb_error("Failed pre_run callback on filter %s", ins->name); + return -1; + } + } + /* destroy config map (will be recreated at flb_start) */ if (ins->config_map) { flb_config_map_destroy(ins->config_map); @@ -205,7 +218,7 @@ int flb_reload_property_check_all(struct flb_config *config) /* Check properties of filter plugins */ ret = flb_filter_propery_check_all(config); if (ret == -1) { - flb_error("[reload] check properties for filter plugins is failed"); + flb_error("[reload] check properties and additonal vaildations for filter plugins is failed"); return -1; } @@ -371,13 +384,13 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) if (ctx == NULL) { flb_error("[reload] given flb context is NULL"); - return -2; + return FLB_RELOAD_INVALID_CONTEXT; } old_config = ctx->config; if (old_config->enable_hot_reload != FLB_TRUE) { flb_warn("[reload] hot reloading is not enabled"); - return -3; + return FLB_RELOAD_NOT_ENABLED; } if (old_config->ensure_thread_safety_on_hot_reloading) { @@ -391,7 +404,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) */ new_cf = flb_cf_create(); if (!new_cf) { - return -1; + return FLB_RELOAD_HALTED; } flb_info("reloading instance pid=%lu tid=%p", (long unsigned) getpid(), pthread_self()); @@ -406,7 +419,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) } flb_cf_destroy(new_cf); flb_error("[reload] reconstruct cf failed"); - return -1; + return FLB_RELOAD_HALTED; } } @@ -419,7 +432,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) flb_cf_destroy(new_cf); flb_error("[reload] creating flb context is failed. Reloading is halted"); - return -1; + return FLB_RELOAD_HALTED; } new_config = new_ctx->config; @@ -446,7 +459,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) if (!new_cf) { flb_sds_destroy(file); - return -1; + return FLB_RELOAD_HALTED; } } @@ -462,7 +475,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) flb_destroy(new_ctx); flb_error("[reload] reloaded config is invalid. Reloading is halted"); - return -1; + return FLB_RELOAD_HALTED; } } @@ -476,7 +489,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) flb_error("[reload] reloaded config format is invalid. Reloading is halted"); - return -1; + return FLB_RELOAD_HALTED; } /* Validate plugin properites before fluent-bit stops the old context. */ @@ -489,7 +502,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) flb_error("[reload] reloaded config is invalid. Reloading is halted"); - return -1; + return FLB_RELOAD_HALTED; } /* Delete the original context of config format before replacing @@ -512,12 +525,19 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) ret = flb_start(new_ctx); - /* Store the new value of hot reloading times into the new context */ - if (ret == 0) { - new_config->hot_reloaded_count = reloaded_count; - flb_debug("[reload] hot reloaded %d time(s)", reloaded_count); - new_config->hot_reloading = FLB_FALSE; + if (ret != 0) { + flb_stop(new_ctx); + flb_destroy(new_ctx); + + flb_error("[reload] loaded configuration contains error(s). Reloading is aborted"); + + return FLB_RELOAD_ABORTED; } + /* Store the new value of hot reloading times into the new context */ + new_config->hot_reloaded_count = reloaded_count; + flb_debug("[reload] hot reloaded %d time(s)", reloaded_count); + new_config->hot_reloading = FLB_FALSE; + return 0; } diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 8ecd517f859..94e5d193281 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -1394,8 +1394,18 @@ int flb_main(int argc, char **argv) #endif if (flb_bin_restarting == FLB_RELOAD_IN_PROGRESS) { /* reload by using same config files/path */ - flb_reload(ctx, cf_opts); - ctx = flb_context_get(); + ret = flb_reload(ctx, cf_opts); + if (ret == 0) { + ctx = flb_context_get(); + flb_bin_restarting = FLB_RELOAD_IDLE; + } + else { + flb_bin_restarting = ret; + } + } + + if (flb_bin_restarting == FLB_RELOAD_HALTED) { + sleep(1); flb_bin_restarting = FLB_RELOAD_IDLE; } } @@ -1403,7 +1413,9 @@ int flb_main(int argc, char **argv) if (exit_signal) { flb_signal_exit(exit_signal); } - ret = ctx->config->exit_status_code; + if (flb_bin_restarting != FLB_RELOAD_ABORTED) { + ret = ctx->config->exit_status_code; + } cf_opts = flb_cf_context_get(); @@ -1425,8 +1437,13 @@ int flb_main(int argc, char **argv) } #endif - flb_stop(ctx); - flb_destroy(ctx); + if (flb_bin_restarting == FLB_RELOAD_ABORTED) { + fprintf(stderr, "reloading is aborted and exit\n"); + } + else { + flb_stop(ctx); + flb_destroy(ctx); + } return ret; }