diff --git a/include/fluent-bit/flb_lua.h b/include/fluent-bit/flb_lua.h index c05070cca42..ddf339d665d 100644 --- a/include/fluent-bit/flb_lua.h +++ b/include/fluent-bit/flb_lua.h @@ -51,6 +51,10 @@ struct flb_lua_l2c_config { struct mk_list l2c_types; /* data types (lua -> C) */ }; +struct flb_lua_func_info { + int params; + int is_variadic; +}; /* * Metatable for Lua table. @@ -87,6 +91,7 @@ static inline int flb_lua_absindex(lua_State *l , int index) int flb_lua_arraylength(lua_State *l, int index); void flb_lua_pushtimetable(lua_State *l, struct flb_time *tm); +int flb_lua_get_func_info(lua_State *lua, char *func, struct flb_lua_func_info *info); int flb_lua_is_valid_func(lua_State *l, flb_sds_t func); int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader); void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o); diff --git a/plugins/filter_lua/lua.c b/plugins/filter_lua/lua.c index de7e6983188..6f885f3f756 100644 --- a/plugins/filter_lua/lua.c +++ b/plugins/filter_lua/lua.c @@ -112,6 +112,7 @@ static int cb_lua_init(struct flb_filter_instance *f_ins, (void) data; struct lua_filter *ctx; struct flb_luajit *lj; + struct flb_lua_func_info info = {0}; /* Create context */ ctx = lua_config_create(f_ins, config); @@ -161,13 +162,56 @@ static int cb_lua_init(struct flb_filter_instance *f_ins, return -1; } - + /* check if the function is valid */ if (flb_lua_is_valid_func(ctx->lua->state, ctx->call) != FLB_TRUE) { flb_plg_error(ctx->ins, "function %s is not found", ctx->call); lua_config_destroy(ctx); return -1; } + /* retrieve function information */ + ret = flb_lua_get_func_info(ctx->lua->state, ctx->call, &info); + if (ret != 0) { + lua_config_destroy(ctx); + return -1; + } + + /* check API version requirements */ + if (ctx->api_version == LUA_API_V1) { + /* + * API v1 function prototype + * ------------------------- + * function test_v1(tag, timestamp, body) + */ + if (info.params != 3) { + flb_plg_error(ctx->ins, + "Fluent Bit Lua script API v1 expects 3 arguments in the " + "callback function, prototype: '" + "function test(tag, timestamp, body)', " + "but '%s' expects %i parameters.", + ctx->call, info.params); + lua_config_destroy(ctx); + return -1; + } + } + else if (ctx->api_version == LUA_API_V2) { + /* + * API v2 function prototype + * ------------------------- + * function test_v2(tag, timestamp, metadata, body) + */ + if (info.params != 4) { + flb_plg_error(ctx->ins, + "Fluent Bit Lua script API v2 expects 4 arguments in the " + "callback function, prototype: '" + "function test(tag, timestamp, metadata, body)', " + "but '%s' expects %i parameters.", + ctx->call, info.params); + lua_config_destroy(ctx); + return -1; + } + } + /* Initialize packing buffer */ ctx->packbuf = flb_sds_create_size(1024); if (!ctx->packbuf) { @@ -428,13 +472,11 @@ static int pack_record(struct lua_filter *ctx, } if (ret == FLB_EVENT_ENCODER_SUCCESS && metadata != NULL) { - ret = flb_log_event_encoder_set_metadata_from_msgpack_object( - log_encoder, metadata); + ret = flb_log_event_encoder_set_metadata_from_msgpack_object(log_encoder, metadata); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - log_encoder, body); + ret = flb_log_event_encoder_set_body_from_msgpack_object(log_encoder, body); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -444,32 +486,49 @@ static int pack_record(struct lua_filter *ctx, return ret; } -static int pack_result (struct lua_filter *ctx, struct flb_time *ts, - msgpack_object *metadata, - struct flb_log_event_encoder *log_encoder, - char *data, size_t bytes) +static int pack_result(struct lua_filter *ctx, struct flb_time *ts, + //msgpack_object *metadata, + struct flb_log_event *log_event, + struct flb_log_event_encoder *log_encoder, + char *meta_buf, size_t meta_size, + char *body_buf, size_t body_size) { int ret; size_t index = 0; size_t off = 0; msgpack_object *entry; - msgpack_unpacked result; - - msgpack_unpacked_init(&result); - - ret = msgpack_unpack_next(&result, data, bytes, &off); + msgpack_unpacked result_meta; + msgpack_unpacked result_body; + + /* unpack metadata if set */ + if (meta_buf) { + msgpack_unpacked_init(&result_meta); + ret = msgpack_unpack_next(&result_meta, meta_buf, meta_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + msgpack_unpacked_destroy(&result_meta); + return FLB_FALSE; + } + } + /* Pack record */ + msgpack_unpacked_init(&result_body); + off = 0; + ret = msgpack_unpack_next(&result_body, body_buf, body_size, &off); if (ret != MSGPACK_UNPACK_SUCCESS) { - msgpack_unpacked_destroy(&result); - + msgpack_unpacked_destroy(&result_body); + if (meta_buf) { + msgpack_unpacked_destroy(&result_meta); + } return FLB_FALSE; } - if (result.data.type == MSGPACK_OBJECT_MAP) { - ret = pack_record(ctx, log_encoder, - ts, metadata, &result.data); + if (result_body.data.type == MSGPACK_OBJECT_MAP) { + ret = pack_record(ctx, log_encoder, ts, &result_meta.data, &result_body.data); + msgpack_unpacked_destroy(&result_body); - msgpack_unpacked_destroy(&result); + if (meta_buf) { + msgpack_unpacked_destroy(&result_meta); + } if (ret != FLB_EVENT_ENCODER_SUCCESS) { return FLB_FALSE; @@ -477,34 +536,30 @@ static int pack_result (struct lua_filter *ctx, struct flb_time *ts, return FLB_TRUE; } - else if (result.data.type == MSGPACK_OBJECT_ARRAY) { - for (index = 0 ; index < result.data.via.array.size ; index++) { - entry = &result.data.via.array.ptr[index]; + else if (result_body.data.type == MSGPACK_OBJECT_ARRAY) { + for (index = 0 ; index < result_body.data.via.array.size ; index++) { + entry = &result_body.data.via.array.ptr[index]; if (entry->type == MSGPACK_OBJECT_MAP) { - ret = pack_record(ctx, log_encoder, - ts, metadata, entry); + //ret = pack_record(ctx, log_encoder, ts, metadata, entry); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - msgpack_unpacked_destroy(&result); - + msgpack_unpacked_destroy(&result_body); return FLB_FALSE; } } else { - msgpack_unpacked_destroy(&result); + msgpack_unpacked_destroy(&result_body); return FLB_FALSE; } } - msgpack_unpacked_destroy(&result); - + msgpack_unpacked_destroy(&result_body); return FLB_TRUE; } - msgpack_unpacked_destroy(&result); - + msgpack_unpacked_destroy(&result_body); return FLB_FALSE; } @@ -524,8 +579,10 @@ static int cb_lua_filter(const void *data, size_t bytes, /* Lua return values */ int l_code; double l_timestamp; - msgpack_packer data_pck; - msgpack_sbuffer data_sbuf; + msgpack_packer body_pck = {0}; + msgpack_sbuffer body_sbuf = {0}; + msgpack_packer meta_pck = {0}; + msgpack_sbuffer meta_sbuf = {0}; struct flb_log_event_encoder log_encoder; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; @@ -555,11 +612,9 @@ static int cb_lua_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - msgpack_sbuffer_init(&data_sbuf); - msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write); + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + msgpack_sbuffer_init(&body_sbuf); + msgpack_packer_init(&body_pck, &body_sbuf, msgpack_sbuffer_write); /* Get timestamp */ flb_time_copy(&t, &log_event.timestamp); @@ -578,15 +633,28 @@ static int cb_lua_filter(const void *data, size_t bytes, lua_pushnumber(ctx->lua->state, ts); } + /* + * Metadata: on v1, logs metadata is not set, however in v2 it is, + * push the metadata content into the stack + */ + if (ctx->api_version == LUA_API_V2) { + flb_lua_pushmsgpack(ctx->lua->state, log_event.metadata); + } + flb_lua_pushmsgpack(ctx->lua->state, log_event.body); if (ctx->protected_mode) { - ret = lua_pcall(ctx->lua->state, 3, 3, 0); + if (ctx->api_version == LUA_API_V1) { + ret = lua_pcall(ctx->lua->state, 3, 3, 0); + } + else { + ret = lua_pcall(ctx->lua->state, 4, 4, 0); + } if (ret != 0) { flb_plg_error(ctx->ins, "error code %d: %s", ret, lua_tostring(ctx->lua->state, -1)); lua_pop(ctx->lua->state, 1); - msgpack_sbuffer_destroy(&data_sbuf); + msgpack_sbuffer_destroy(&body_sbuf); flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); @@ -594,16 +662,40 @@ static int cb_lua_filter(const void *data, size_t bytes, } } else { - lua_call(ctx->lua->state, 3, 3); + if (ctx->api_version == LUA_API_V1) { + lua_call(ctx->lua->state, 3, 3); + } + else { + lua_call(ctx->lua->state, 4, 4); + } } /* Initialize Return values */ l_code = 0; l_timestamp = ts; - flb_lua_tomsgpack(ctx->lua->state, &data_pck, 0, &ctx->l2cc); + /* log body */ + flb_lua_tomsgpack(ctx->lua->state, &body_pck, 0, &ctx->l2cc); lua_pop(ctx->lua->state, 1); + if (ctx->api_version == LUA_API_V2) { + /* initialize msgpack buffer for metadata */ + msgpack_sbuffer_init(&meta_sbuf); + msgpack_packer_init(&meta_pck, &meta_sbuf, msgpack_sbuffer_write); + + /* Check for metadata (third return value) */ + if (lua_istable(ctx->lua->state, -1)) { + /* Metadata is present */ + flb_lua_tomsgpack(ctx->lua->state, &meta_pck, 0, &ctx->l2cc); + lua_pop(ctx->lua->state, 1); + } + else { + /* Metadata not modified */ + lua_pop(ctx->lua->state, 1); + msgpack_sbuffer_destroy(&meta_sbuf); + } + } + /* Lua table */ if (ctx->time_as_table == FLB_TRUE) { if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { @@ -627,11 +719,15 @@ static int cb_lua_filter(const void *data, size_t bytes, lua_pop(ctx->lua->state, 1); } + /* return value from Lua */ l_code = (int) lua_tointeger(ctx->lua->state, -1); lua_pop(ctx->lua->state, 1); if (l_code == -1) { /* Skip record */ - msgpack_sbuffer_destroy(&data_sbuf); + if (ctx->api_version == LUA_API_V2) { + msgpack_sbuffer_destroy(&meta_sbuf); + } + msgpack_sbuffer_destroy(&body_sbuf); continue; } else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */ @@ -645,13 +741,14 @@ static int cb_lua_filter(const void *data, size_t bytes, t = t_orig; } - ret = pack_result(ctx, &t, log_event.metadata, &log_encoder, - data_sbuf.data, data_sbuf.size); + ret = pack_result(ctx, &t, &log_event, &log_encoder, + meta_sbuf.data, meta_sbuf.size, + body_sbuf.data, body_sbuf.size); if (ret == FLB_FALSE) { flb_plg_error(ctx->ins, "invalid table returned at %s(), %s", ctx->call, ctx->script); - msgpack_sbuffer_destroy(&data_sbuf); + msgpack_sbuffer_destroy(&body_sbuf); flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); @@ -678,7 +775,7 @@ static int cb_lua_filter(const void *data, size_t bytes, } } - msgpack_sbuffer_destroy(&data_sbuf); + msgpack_sbuffer_destroy(&body_sbuf); } if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) { @@ -724,6 +821,10 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "The path of lua script." }, + { + FLB_CONFIG_MAP_STR, "api", "v1", + 0, FLB_TRUE, offsetof(struct lua_filter, api_version_str), + }, { FLB_CONFIG_MAP_STR, "code", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/filter_lua/lua_config.c b/plugins/filter_lua/lua_config.c index 3fc80f048d3..40970c0db7c 100644 --- a/plugins/filter_lua/lua_config.c +++ b/plugins/filter_lua/lua_config.c @@ -67,6 +67,19 @@ struct lua_filter *lua_config_create(struct flb_filter_instance *ins, lf->ins = ins; lf->script = NULL; + /* api version */ + if (strcmp(lf->api_version_str, "v1") == 0) { + lf->api_version = LUA_API_V1; + } + else if (strcmp(lf->api_version_str, "v2") == 0) { + lf->api_version = LUA_API_V2; + } + else { + flb_plg_error(lf->ins, "invalid API version '%s'", lf->api_version_str); + flb_free(lf); + return NULL; + } + /* config: code */ tmp = flb_filter_get_property("code", ins); if (tmp) { diff --git a/plugins/filter_lua/lua_config.h b/plugins/filter_lua/lua_config.h index e67beb38cb9..365eef62ef9 100644 --- a/plugins/filter_lua/lua_config.h +++ b/plugins/filter_lua/lua_config.h @@ -27,8 +27,11 @@ #include #define LUA_BUFFER_CHUNK 1024 * 8 /* 8K should be enough to get started */ +#define LUA_API_V1 1 +#define LUA_API_V2 2 struct lua_filter { + flb_sds_t api_version_str; /* API version string */ flb_sds_t code; /* lua script source code */ flb_sds_t script; /* lua script path */ flb_sds_t call; /* function name */ @@ -36,6 +39,7 @@ struct lua_filter { int protected_mode; /* exec lua function in protected mode */ int time_as_table; /* timestamp as a Lua table */ int enable_flb_null; /* Use flb_null in Lua */ + int api_version; struct flb_lua_l2c_config l2cc; /* lua -> C config */ struct flb_luajit *lua; /* state context */ struct flb_filter_instance *ins; /* filter instance */ diff --git a/src/flb_lua.c b/src/flb_lua.c index 3dd84d1c19f..723060daea2 100644 --- a/src/flb_lua.c +++ b/src/flb_lua.c @@ -51,6 +51,64 @@ void flb_lua_pushtimetable(lua_State *l, struct flb_time *tm) lua_settable(l, -3); } +/* Retrieve information from a Lua function */ +int flb_lua_get_func_info(lua_State *lua, char *func, struct flb_lua_func_info *info) +{ + if (!info) { + return -1; + } + + lua_getglobal(lua, "require"); + lua_pushstring(lua, "jit.util"); + + if (lua_pcall(lua, 1, 1, 0) != LUA_OK) { + flb_error("cannot load jit.util: %s", lua_tostring(lua, -1)); + lua_pop(lua, 1); + return -1; + } + + /* Push the jit.util.funcinfo function */ + lua_getfield(lua, -1, "funcinfo"); + if (!lua_isfunction(lua, -1)) { + flb_error("'funcinfo' is not a function in jit.util"); + lua_pop(lua, 2); /* pop 'funcinfo' 'jit.util' */ + return -1; + } + + /* Push the target function */ + lua_getglobal(lua, func); + if (!lua_isfunction(lua, -1)) { + flb_error("the function '%s' is not valid", func); + lua_pop(lua, 1); + return -1; + } + + /* call jit.util.funcinfo(func) */ + if (lua_pcall(lua, 1, 1, 0) != LUA_OK) { + flb_error("cannot call funcinfo: %s", lua_tostring(lua, -1)); + lua_pop(lua, 1); + return -1; + } + + /* get the 'params' and 'isvararg' fields from the result */ + lua_getfield(lua, -1, "params"); + lua_getfield(lua, -2, "isvararg"); + + if (!lua_isnumber(lua, -2) || !lua_isboolean(lua, -1)) { + flb_error("failed to retrieve valid function info."); + /* clean up: 'params', 'isvararg', 'funcinfo' table, and 'jit.util' */ + lua_pop(lua, 4); + return -1; + } + + info->params = lua_tointeger(lua, -2); + info->is_variadic = lua_toboolean(lua, -1); + + /* clean up: funcinfo table + fields + jit.util */ + lua_pop(lua, 4); + return 0; +} + int flb_lua_is_valid_func(lua_State *lua, flb_sds_t func) { int ret = FLB_FALSE; diff --git a/tests/runtime/filter_lua.c b/tests/runtime/filter_lua.c index 1e3111266d0..60bdbeb7423 100644 --- a/tests/runtime/filter_lua.c +++ b/tests/runtime/filter_lua.c @@ -23,6 +23,9 @@ #include #include #include +#include +#include + #include "flb_tests_runtime.h" #define TMP_LUA_PATH "a.lua" @@ -939,16 +942,184 @@ void flb_test_invalid_metatable(void) flb_input_set(ctx, in_ffd, "tag", "test", NULL); TEST_CHECK(in_ffd >= 0); - /* Lib output */ - out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data); + /* Lib output (configured to receive a chunk )*/ + out_ffd = flb_output(ctx, (char *) "lib", (void *) &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret==0); + + ret = flb_lib_push(ctx, in_ffd, input, strlen(input)); + if (!TEST_CHECK(ret != -1)) { + TEST_MSG("flb_lib_push error"); + } + flb_time_msleep(1500); /* waiting flush */ + + ret = get_output_num(); + if (!TEST_CHECK(ret > 0)) { + TEST_MSG("error. no output"); + } + + /* clean up */ + flb_lib_free(output); + delete_script(); + + flb_stop(ctx); + flb_destroy(ctx); +} + +static char *get_log_metadata(void *chunk, size_t size) +{ + int ret; + char *json; + struct flb_log_event log_event; + struct flb_log_event_decoder log_decoder; + + ret = flb_log_event_decoder_init(&log_decoder, chunk, size); + TEST_CHECK(ret == FLB_EVENT_DECODER_SUCCESS); + + /* record */ + flb_log_event_decoder_next(&log_decoder, &log_event); + + /* convert log body to json */ + json = flb_msgpack_to_json_str(1024, log_event.metadata); + + flb_log_event_decoder_destroy(&log_decoder); + return json; +} + +static char *get_log_body(void *chunk, size_t size) +{ + int ret; + char *json; + struct flb_log_event log_event; + struct flb_log_event_decoder log_decoder; + + ret = flb_log_event_decoder_init(&log_decoder, chunk, size); + TEST_CHECK(ret == FLB_EVENT_DECODER_SUCCESS); + + /* record */ + flb_log_event_decoder_next(&log_decoder, &log_event); + + /* convert log body to json */ + json = flb_msgpack_to_json_str(1024, log_event.body); + + flb_log_event_decoder_destroy(&log_decoder); + return json; +} + +static int cb_check_log_meta_and_body(void *chunk, size_t size, void *data) +{ + int ret; + int num = get_output_num(); + char *expected_body = "{\"body_test\":\"ok\"}"; + char *expected_metadata = "{\"meta_test\":\"ok\"}"; + char *json; + + /* Log metadata */ + json = get_log_metadata(chunk, size); + TEST_CHECK(json != NULL); + + ret = strcmp(json, expected_metadata); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_error("metadata mismatch:"); + flb_error(" - Expected: '%s'", expected_metadata); + flb_error(" - Received: '%s'", json); + } + else { + printf("log metadata match: '%s'\n", json); + } + + flb_free(json); + + /* Log body */ + json = get_log_body(chunk, size); + TEST_CHECK(json != NULL); + + ret = strcmp(json, expected_body); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_error("logs mismatch:"); + flb_error(" - Expected: '%s'", expected_body); + flb_error(" - Received: '%s'", json); + } + else { + printf("log body match: '%s'\n", json); + } + free(json); + + pthread_mutex_lock(&result_mutex); + num_output++; + pthread_mutex_unlock(&result_mutex); + + return 0; +} + +void flb_test_log_metadata_api_v2() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int unused = 0; + int filter_ffd; + char *output = NULL; + char *input; + struct flb_lib_out_cb cb_data; + + char *script_body = "" + "function test_v2(tag, timestamp, metadata, body)\n" + " new_metadata = {}\n" + " new_metadata['meta_test'] = 'ok'\n" + " new_body = {}\n" + " new_body['body_test'] = 'ok'\n" + " return 2, timestamp, new_metadata, new_body\n" + "end\n"; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL); + + /* Prepare output callback context*/ + cb_data.cb = cb_check_log_meta_and_body; + cb_data.data = NULL; /* expected output is inside the callback */ + + /* Lua filter */ + filter_ffd = flb_filter(ctx, (char *) "lua", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "match", "*", + "api", "v2", + "call", "test_v2", + "code", script_body, + NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Lib output (configured to receive a chunk )*/ + out_ffd = flb_output(ctx, (char *) "lib", (void *) &cb_data); TEST_CHECK(out_ffd >= 0); flb_output_set(ctx, out_ffd, "match", "test", + "format", "chunk", NULL); ret = flb_start(ctx); TEST_CHECK(ret==0); + + /* set the input */ + input = "[[1734839279, {\"test\": \"lua\"}], {\"key\":\"val\"}]"; + ret = flb_lib_push(ctx, in_ffd, input, strlen(input)); if (!TEST_CHECK(ret != -1)) { TEST_MSG("flb_lib_push error"); @@ -980,5 +1151,6 @@ TEST_LIST = { {"split_record", flb_test_split_record}, {"empty_array", flb_test_empty_array}, {"invalid_metatable", flb_test_invalid_metatable}, + {"log_metadata_api_v2", flb_test_log_metadata_api_v2}, {NULL, NULL} };