diff --git a/include/fluent-bit/flb_lua.h b/include/fluent-bit/flb_lua.h index 390a3bcba10..62f437287fe 100644 --- a/include/fluent-bit/flb_lua.h +++ b/include/fluent-bit/flb_lua.h @@ -44,14 +44,15 @@ struct flb_lua_l2c_type { struct flb_lua_l2c_config { int l2c_types_num; /* number of l2c_types */ + flb_sds_t l2c_nil_str; /* string to represent nil value */ struct mk_list l2c_types; /* data types (lua -> C) */ }; int flb_lua_arraylength(lua_State *l); void flb_lua_pushtimetable(lua_State *l, struct flb_time *tm); int flb_lua_is_valid_func(lua_State *l, flb_sds_t func); -int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader); -void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o); +int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader, flb_sds_t nil_str); +void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o, flb_sds_t nil_str); void flb_lua_tomsgpack(lua_State *l, msgpack_packer *pck, int index, diff --git a/plugins/filter_lua/lua.c b/plugins/filter_lua/lua.c index a95c87da1fe..6d9a366a15c 100644 --- a/plugins/filter_lua/lua.c +++ b/plugins/filter_lua/lua.c @@ -503,7 +503,7 @@ static int cb_lua_filter(const void *data, size_t bytes, lua_pushnumber(ctx->lua->state, ts); } - flb_lua_pushmsgpack(ctx->lua->state, log_event.body); + flb_lua_pushmsgpack(ctx->lua->state, log_event.body, ctx->l2cc.l2c_nil_str); if (ctx->protected_mode) { ret = lua_pcall(ctx->lua->state, 3, 3, 0); if (ret != 0) { @@ -671,6 +671,13 @@ static struct flb_config_map config_map[] = { "If these keys are matched, the fields are converted to array. " "If more than one key, delimit by space." }, + { + FLB_CONFIG_MAP_STR, "nil_str", NULL, + 0, FLB_TRUE, offsetof(struct lua_filter, nil_str), + "If set, nil value will be replaced by this string in Lua. " + "It is to prevent remove nil value from record " + "since nil value is to delete key/value from associative array in Lua." + }, { FLB_CONFIG_MAP_BOOL, "protected_mode", "true", 0, FLB_TRUE, offsetof(struct lua_filter, protected_mode), diff --git a/plugins/filter_lua/lua_config.c b/plugins/filter_lua/lua_config.c index f0c15419610..74467a61962 100644 --- a/plugins/filter_lua/lua_config.c +++ b/plugins/filter_lua/lua_config.c @@ -127,6 +127,12 @@ struct lua_filter *lua_config_create(struct flb_filter_instance *ins, } lf->l2cc.l2c_types_num = 0; + if (lf->nil_str) { + lf->l2cc.l2c_nil_str = flb_sds_create(lf->nil_str); + } + else { + lf->l2cc.l2c_nil_str = NULL; + } tmp = flb_filter_get_property("type_int_key", ins); if (tmp) { split = flb_utils_split(tmp, ' ', FLB_LUA_L2C_TYPES_NUM_MAX); @@ -189,6 +195,9 @@ void lua_config_destroy(struct lua_filter *lf) if (lf->buffer) { flb_sds_destroy(lf->buffer); } + if (lf->l2cc.l2c_nil_str) { + flb_sds_destroy(lf->l2cc.l2c_nil_str); + } mk_list_foreach_safe(head, tmp_list, &lf->l2cc.l2c_types) { l2c = mk_list_entry(head, struct flb_lua_l2c_type, _head); diff --git a/plugins/filter_lua/lua_config.h b/plugins/filter_lua/lua_config.h index e5cc1a9b4e5..1fb83fde63a 100644 --- a/plugins/filter_lua/lua_config.h +++ b/plugins/filter_lua/lua_config.h @@ -33,6 +33,7 @@ struct lua_filter { flb_sds_t script; /* lua script path */ flb_sds_t call; /* function name */ flb_sds_t buffer; /* json dec buffer */ + flb_sds_t nil_str; /* The string to represent nil value in Lua */ int protected_mode; /* exec lua function in protected mode */ int time_as_table; /* timestamp as a Lua table */ struct flb_lua_l2c_config l2cc; /* lua -> C config */ diff --git a/src/flb_lua.c b/src/flb_lua.c index c6badec16bc..6141e0c314b 100644 --- a/src/flb_lua.c +++ b/src/flb_lua.c @@ -53,7 +53,7 @@ int flb_lua_is_valid_func(lua_State *lua, flb_sds_t func) return ret; } -int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader) +int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader, flb_sds_t nil_str) { int ret = 0; mpack_tag_t tag; @@ -63,7 +63,17 @@ int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader) tag = mpack_read_tag(reader); switch (mpack_tag_type(&tag)) { case mpack_type_nil: - lua_pushnil(l); + if (nil_str) { + /* + The nil value is a special value to delete key/value from associative array. + It means that pairs that contain nil value will disappear. + For that reason, we push "nil" string instead of the nil value. + */ + lua_pushlstring(l, nil_str, flb_sds_len(nil_str)); + } + else { + lua_pushnil(l); + } break; case mpack_type_bool: lua_pushboolean(l, mpack_tag_bool_value(&tag)); @@ -91,7 +101,7 @@ int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader) length = mpack_tag_array_count(&tag); lua_createtable(l, length, 0); for (i = 0; i < length; i++) { - ret = flb_lua_pushmpack(l, reader); + ret = flb_lua_pushmpack(l, reader, nil_str); if (ret) { return ret; } @@ -102,11 +112,11 @@ int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader) length = mpack_tag_map_count(&tag); lua_createtable(l, length, 0); for (i = 0; i < length; i++) { - ret = flb_lua_pushmpack(l, reader); + ret = flb_lua_pushmpack(l, reader, nil_str); if (ret) { return ret; } - ret = flb_lua_pushmpack(l, reader); + ret = flb_lua_pushmpack(l, reader, nil_str); if (ret) { return ret; } @@ -119,7 +129,7 @@ int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader) return 0; } -void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o) +void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o, flb_sds_t nil_str) { int i; int size; @@ -128,7 +138,17 @@ void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o) switch(o->type) { case MSGPACK_OBJECT_NIL: - lua_pushnil(l); + if (nil_str) { + /* + The nil value is a special value to delete key/value from associative array. + It means that pairs that contain nil value will disappear. + For that reason, we push "nil" string instead of the nil value. + */ + lua_pushlstring(l, nil_str, flb_sds_len(nil_str)); + } + else { + lua_pushnil(l); + } break; case MSGPACK_OBJECT_BOOLEAN: @@ -166,7 +186,7 @@ void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o) if (size != 0) { msgpack_object *p = o->via.array.ptr; for (i = 0; i < size; i++) { - flb_lua_pushmsgpack(l, p+i); + flb_lua_pushmsgpack(l, p+i, nil_str); lua_rawseti (l, -2, i+1); } } @@ -178,8 +198,8 @@ void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o) if (size != 0) { msgpack_object_kv *p = o->via.map.ptr; for (i = 0; i < size; i++) { - flb_lua_pushmsgpack(l, &(p+i)->key); - flb_lua_pushmsgpack(l, &(p+i)->val); + flb_lua_pushmsgpack(l, &(p+i)->key, nil_str); + flb_lua_pushmsgpack(l, &(p+i)->val, nil_str); lua_settable(l, -3); } } @@ -419,8 +439,13 @@ void flb_lua_tompack(lua_State *l, size_t len; str = lua_tolstring(l, -1 + index, &len); - - mpack_write_str(writer, str, len); + if (l2cc->l2c_nil_str && strlen(str) == flb_sds_len(l2cc->l2c_nil_str) && + !strncmp(str, l2cc->l2c_nil_str, flb_sds_len(l2cc->l2c_nil_str))) { + mpack_write_nil(writer); + } + else { + mpack_write_str(writer, str, len); + } } break; case LUA_TNUMBER: @@ -509,9 +534,14 @@ void flb_lua_tomsgpack(lua_State *l, size_t len; str = lua_tolstring(l, -1 + index, &len); - - msgpack_pack_str(pck, len); - msgpack_pack_str_body(pck, str, len); + if (l2cc->l2c_nil_str && strlen(str) == flb_sds_len(l2cc->l2c_nil_str) && + !strncmp(str, l2cc->l2c_nil_str, flb_sds_len(l2cc->l2c_nil_str))) { + msgpack_pack_nil(pck); + } + else { + msgpack_pack_str(pck, len); + msgpack_pack_str_body(pck, str, len); + } } break; case LUA_TNUMBER: diff --git a/tests/internal/lua.c b/tests/internal/lua.c index d71b462fad2..44e46e44316 100644 --- a/tests/internal/lua.c +++ b/tests/internal/lua.c @@ -112,7 +112,7 @@ static void test_pushmsgpack() msgpack_unpacked_init(&msg); msgpack_unpack_next(&msg, sbuf.data, sbuf.size, NULL); - flb_lua_pushmsgpack(l, &msg.data); + flb_lua_pushmsgpack(l, &msg.data, NULL); check_equals(l, "{ [1] = { [key] = value } [2] = msgpack-str [3] = 4 }"); msgpack_unpacked_destroy(&msg); @@ -137,7 +137,7 @@ static void test_pushmpack() msgpack_pack_int(&pck, 4); mpack_reader_init_data(&reader, sbuf.data, sbuf.size); - flb_lua_pushmpack(l, &reader); + flb_lua_pushmpack(l, &reader, NULL); check_equals(l, "{ [1] = { [key] = value } [2] = msgpack-str [3] = 4 }"); msgpack_sbuffer_destroy(&sbuf); @@ -158,6 +158,7 @@ static void test_tomsgpack() msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); mk_list_init(&l2cc.l2c_types); l2cc.l2c_types_num = 0; + l2cc.l2c_nil_str = NULL; lua_getglobal(l, "obj"); flb_lua_tomsgpack(l, &pck, 0, &l2cc); @@ -188,6 +189,7 @@ static void test_tompack() mpack_writer_init(&writer, buf, sizeof(buf)); mk_list_init(&l2cc.l2c_types); l2cc.l2c_types_num = 0; + l2cc.l2c_nil_str = NULL; lua_getglobal(l, "obj"); flb_lua_tompack(l, &writer, 0, &l2cc); diff --git a/tests/runtime/filter_lua.c b/tests/runtime/filter_lua.c index 9f694377eae..1548f6c56cb 100644 --- a/tests/runtime/filter_lua.c +++ b/tests/runtime/filter_lua.c @@ -753,12 +753,85 @@ void flb_test_split_record(void) flb_sds_destroy(outbuf); } +/* https://github.com/fluent/fluent-bit/issues/7708 */ +void flb_test_nil_str(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + char *output = NULL; + char *input = "[0, {\"key\":\"val\", \"nil_key\":null}]"; + char *result; + struct flb_lib_out_cb cb_data; + + char *script_body = "" + "function lua_main(tag, timestamp, record)\n" + " new_record = record\n" + " return 1, timestamp, new_record\n" + "end\n"; + + clear_output(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL); + + /* Prepare output callback context*/ + cb_data.cb = callback_test; + cb_data.data = NULL; + + ret = create_script(script_body, strlen(script_body)); + TEST_CHECK(ret == 0); + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "lua", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "*", + "call", "lua_main", + "nil_str", "nil", + "script", TMP_LUA_PATH, + NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Lib output */ + out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret==0); + + flb_lib_push(ctx, in_ffd, input, strlen(input)); + wait_with_timeout(2000, &output); + result = strstr(output, "\"nil_key\":null"); + if(!TEST_CHECK(result != NULL)) { + TEST_MSG("output:%s\n", output); + } + + /* clean up */ + flb_lib_free(output); + delete_script(); + + flb_stop(ctx); + flb_destroy(ctx); +} + TEST_LIST = { {"hello_world", flb_test_helloworld}, {"append_tag", flb_test_append_tag}, {"type_int_key", flb_test_type_int_key}, {"type_int_key_multi", flb_test_type_int_key_multi}, {"type_array_key", flb_test_type_array_key}, + {"nil_str", flb_test_nil_str}, {"array_contains_null", flb_test_array_contains_null}, {"drop_all_records", flb_test_drop_all_records}, {"split_record", flb_test_split_record},