diff --git a/include/fluent-bit/flb_lua.h b/include/fluent-bit/flb_lua.h index c05070cca42..c87f0050628 100644 --- a/include/fluent-bit/flb_lua.h +++ b/include/fluent-bit/flb_lua.h @@ -100,5 +100,5 @@ void flb_lua_tompack(lua_State *l, struct flb_lua_l2c_config *l2cc); void flb_lua_dump_stack(FILE *out, lua_State *l); int flb_lua_enable_flb_null(lua_State *l); - +void flb_lua_bulk_process(lua_State *l, struct flb_time *t, msgpack_object *o, int table_index); #endif diff --git a/plugins/filter_lua/lua.c b/plugins/filter_lua/lua.c index 5186955b8ae..8b62e77b985 100644 --- a/plugins/filter_lua/lua.c +++ b/plugins/filter_lua/lua.c @@ -428,17 +428,14 @@ static int pack_result (struct lua_filter *ctx, struct flb_time *ts, msgpack_unpacked_init(&result); ret = msgpack_unpack_next(&result, data, bytes, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { msgpack_unpacked_destroy(&result); return FLB_FALSE; } - if (result.data.type == MSGPACK_OBJECT_MAP) { ret = pack_record(ctx, log_encoder, ts, metadata, &result.data); - msgpack_unpacked_destroy(&result); if (ret != FLB_EVENT_ENCODER_SUCCESS) { @@ -500,6 +497,7 @@ static int cb_lua_filter(const void *data, size_t bytes, struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; + (void) f_ins; (void) i_ins; (void) config; @@ -525,140 +523,275 @@ static int cb_lua_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - msgpack_sbuffer_init(&data_sbuf); - msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write); + if (ctx->chunk_mode) { + if (ctx->time_as_table != FLB_TRUE) { + flb_plg_error(ctx->ins, + "time_as_table needed in chunk_mode"); - /* Get timestamp */ - flb_time_copy(&t, &log_event.timestamp); - flb_time_copy(&t_orig, &log_event.timestamp); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); - /* Prepare function call, pass 3 arguments, expect 3 return values */ + return FLB_FILTER_NOTOUCH; + } lua_getglobal(ctx->lua->state, ctx->call); - lua_pushstring(ctx->lua->state, tag); - /* Timestamp */ - if (ctx->time_as_table == FLB_TRUE) { - flb_lua_pushtimetable(ctx->lua->state, &t); - } - else { - ts = flb_time_to_double(&t); - lua_pushnumber(ctx->lua->state, ts); - } + /* Create Lua table outside the loop */ + lua_createtable(ctx->lua->state, 0, 0); - flb_lua_pushmsgpack(ctx->lua->state, log_event.body); - if (ctx->protected_mode) { - ret = lua_pcall(ctx->lua->state, 3, 3, 0); - if (ret != 0) { - flb_plg_error(ctx->ins, "error code %d: %s", - ret, lua_tostring(ctx->lua->state, -1)); - lua_pop(ctx->lua->state, 1); + /* Push all timestamp + record tuples into the table */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - msgpack_sbuffer_destroy(&data_sbuf); - flb_log_event_decoder_destroy(&log_decoder); - flb_log_event_encoder_destroy(&log_encoder); + /* Copy the current timestamp */ + flb_time_copy(&t, &log_event.timestamp); - return FLB_FILTER_NOTOUCH; + /* Add record to be send to lua */ + flb_lua_bulk_process(ctx->lua->state, &t, log_event.body, -2); } - } - else { - lua_call(ctx->lua->state, 3, 3); - } + /* Push the tag to the lua table. Same for this whole chunk */ + lua_pushstring(ctx->lua->state, tag); + lua_insert(ctx->lua->state, -2); - /* Initialize Return values */ - l_code = 0; - l_timestamp = ts; + if (ctx->protected_mode) { + /* Prepare function call, pass tag + records, expect records */ + ret = lua_pcall(ctx->lua->state, 2, 1, 0); + if (ret != 0) { + flb_plg_error(ctx->ins, "error code %d: %s", + ret, lua_tostring(ctx->lua->state, -1)); + lua_pop(ctx->lua->state, 1); - flb_lua_tomsgpack(ctx->lua->state, &data_pck, 0, &ctx->l2cc); - lua_pop(ctx->lua->state, 1); + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return FLB_FILTER_NOTOUCH; + } + } + else { + /* Prepare function call, pass tag + records, expect records */ + lua_call(ctx->lua->state, 2, 1); + } + + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + int array_size = lua_objlen(ctx->lua->state, -1); + for (int i = 1; i <= array_size; i++) { + /* Retrieve each record entry from records table */ + lua_rawgeti(ctx->lua->state, -1, i); + + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + /* Retrieve timestamp */ + lua_getfield(ctx->lua->state, -1, "timestamp"); + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + /* Retrieve seconds */ + lua_getfield(ctx->lua->state, -1, "sec"); + t.tm.tv_sec = lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 1); + + /* Retrieve nsec */ + lua_getfield(ctx->lua->state, -1, "nsec"); + t.tm.tv_nsec = lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 2); + + flb_plg_debug(ctx->ins, + "Timestamp: sec=%ld, nsec=%ld", + (long)t.tm.tv_sec, (long)t.tm.tv_nsec); + } else { + flb_plg_error(ctx->ins, + "invalid timestamp field type returned"); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return FLB_FILTER_NOTOUCH; + } + + /* Retrieve record */ + lua_getfield(ctx->lua->state, -1, "record"); + + /* Check the type of 'record' field */ + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + msgpack_sbuffer_init(&data_sbuf); + msgpack_packer_init(&data_pck, + &data_sbuf, + msgpack_sbuffer_write); + /* Create msgpack object for every record */ + flb_plg_debug(ctx->ins, + "Creating msgpack object for record"); + flb_lua_tomsgpack(ctx->lua->state, + &data_pck, + 0, + &ctx->l2cc); + ret = flb_log_event_encoder_begin_record( + &log_encoder); + ret = flb_log_event_encoder_set_timestamp( + &log_encoder, &t); + ret = flb_log_event_encoder_set_body_from_raw_msgpack( + &log_encoder, + data_sbuf.data, + data_sbuf.size); + ret = flb_log_event_encoder_commit_record( + &log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, "Failed to emit record"); + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return FLB_FILTER_NOTOUCH; + } + + msgpack_sbuffer_destroy(&data_sbuf); + flb_plg_debug(ctx->ins, + "Msgpack object created for record."); + } else { + flb_plg_error(ctx->ins, + "invalid record field type returned"); + } + + lua_pop(ctx->lua->state, 1); + } else { + flb_plg_error(ctx->ins, + "invalid lua record entry type returned"); + } - /* Lua table */ - if (ctx->time_as_table == FLB_TRUE) { - if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { - /* Retrieve seconds */ - lua_getfield(ctx->lua->state, -1, "sec"); - t.tm.tv_sec = lua_tointeger(ctx->lua->state, -1); lua_pop(ctx->lua->state, 1); + } + } else { + flb_plg_error(ctx->ins, "invalid lua table entry returned"); + } + lua_pop(ctx->lua->state, 1); + } + else { + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + msgpack_sbuffer_init(&data_sbuf); + msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write); + + /* Get timestamp */ + flb_time_copy(&t, &log_event.timestamp); + flb_time_copy(&t_orig, &log_event.timestamp); + + /* Prepare function call, pass 3 arguments, expect 3 return values */ + lua_getglobal(ctx->lua->state, ctx->call); + lua_pushstring(ctx->lua->state, tag); + + /* Timestamp */ + if (ctx->time_as_table == FLB_TRUE) { + flb_lua_pushtimetable(ctx->lua->state, &t); + } + else { + ts = flb_time_to_double(&t); + lua_pushnumber(ctx->lua->state, ts); + } - /* Retrieve nanoseconds */ - lua_getfield(ctx->lua->state, -1, "nsec"); - t.tm.tv_nsec = lua_tointeger(ctx->lua->state, -1); - lua_pop(ctx->lua->state, 2); + flb_lua_pushmsgpack(ctx->lua->state, log_event.body); + if (ctx->protected_mode) { + ret = lua_pcall(ctx->lua->state, 3, 3, 0); + if (ret != 0) { + flb_plg_error(ctx->ins, "error code %d: %s", + ret, lua_tostring(ctx->lua->state, -1)); + lua_pop(ctx->lua->state, 1); + + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return FLB_FILTER_NOTOUCH; + } } else { - flb_plg_error(ctx->ins, "invalid lua timestamp type returned"); - t = t_orig; + lua_call(ctx->lua->state, 3, 3); } - } - else { - l_timestamp = (double) lua_tonumber(ctx->lua->state, -1); - lua_pop(ctx->lua->state, 1); - } - l_code = (int) lua_tointeger(ctx->lua->state, -1); - lua_pop(ctx->lua->state, 1); + /* Initialize Return values */ + l_code = 0; + l_timestamp = ts; - if (l_code == -1) { /* Skip record */ - msgpack_sbuffer_destroy(&data_sbuf); - continue; - } - else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */ - if (l_code == 1) { - if (ctx->time_as_table == FLB_FALSE) { - flb_time_from_double(&t, l_timestamp); + flb_lua_tomsgpack(ctx->lua->state, &data_pck, 0, &ctx->l2cc); + lua_pop(ctx->lua->state, 1); + + /* Lua table */ + if (ctx->time_as_table == FLB_TRUE) { + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + /* Retrieve seconds */ + lua_getfield(ctx->lua->state, -1, "sec"); + t.tm.tv_sec = lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 1); + + /* Retrieve nanoseconds */ + lua_getfield(ctx->lua->state, -1, "nsec"); + t.tm.tv_nsec = lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 2); + } + else { + flb_plg_error(ctx->ins, "invalid lua timestamp type returned"); + t = t_orig; } } - else if (l_code == 2) { - /* Keep the timestamp */ - t = t_orig; + else { + l_timestamp = (double) lua_tonumber(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 1); } - ret = pack_result(ctx, &t, log_event.metadata, &log_encoder, - data_sbuf.data, data_sbuf.size); + l_code = (int) lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 1); - if (ret == FLB_FALSE) { - flb_plg_error(ctx->ins, "invalid table returned at %s(), %s", - ctx->call, ctx->script); + if (l_code == -1) { /* Skip record */ msgpack_sbuffer_destroy(&data_sbuf); + continue; + } + else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */ + if (l_code == 1) { + if (ctx->time_as_table == FLB_FALSE) { + flb_time_from_double(&t, l_timestamp); + } + } + else if (l_code == 2) { + /* Keep the timestamp */ + t = t_orig; + } - flb_log_event_decoder_destroy(&log_decoder); - flb_log_event_encoder_destroy(&log_encoder); + ret = pack_result(ctx, &t, log_event.metadata, &log_encoder, + data_sbuf.data, data_sbuf.size); - return FLB_FILTER_NOTOUCH; - } - } - else { /* Unexpected return code, keep original content */ - /* Code 0 means Keep record, so we don't emit the warning */ - if (l_code != 0) { - flb_plg_error(ctx->ins, - "unexpected Lua script return code %i, " - "original record will be kept." , l_code); + if (ret == FLB_FALSE) { + flb_plg_error(ctx->ins, "invalid table returned at %s(), %s", + ctx->call, ctx->script); + msgpack_sbuffer_destroy(&data_sbuf); + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return FLB_FILTER_NOTOUCH; + } } + else { /* Unexpected return code, keep original content */ + /* Code 0 means Keep record, so we don't emit the warning */ + if (l_code != 0) { + flb_plg_error(ctx->ins, + "unexpected Lua script return code %i, " + "original record will be kept." , l_code); + } - ret = flb_log_event_encoder_emit_raw_record( - &log_encoder, - log_decoder.record_base, - log_decoder.record_length); + ret = flb_log_event_encoder_emit_raw_record( + &log_encoder, + log_decoder.record_base, + log_decoder.record_length); - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event encoder error : %d", ret); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event encoder error : %d", ret); + } } - } - msgpack_sbuffer_destroy(&data_sbuf); + msgpack_sbuffer_destroy(&data_sbuf); + } } - if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) { ret = FLB_EVENT_ENCODER_SUCCESS; } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { *out_buf = log_encoder.output_buffer; *out_bytes = log_encoder.output_length; - ret = FLB_FILTER_MODIFIED; flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); @@ -669,7 +802,6 @@ static int cb_lua_filter(const void *data, size_t bytes, ret = FLB_FILTER_NOTOUCH; } - flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); @@ -722,6 +854,12 @@ static struct flb_config_map config_map[] = { "If enabled, Lua script will be executed in protected mode. " "It prevents to crash when invalid Lua script is executed." }, + { + FLB_CONFIG_MAP_BOOL, "chunk_mode", "false", + 0, FLB_TRUE, offsetof(struct lua_filter, chunk_mode), + "If enabled, a whole chunk will be sent to Lua script as table. " + "It may be used for e.g. parallel execution inside Lua script." + }, { FLB_CONFIG_MAP_BOOL, "time_as_table", "false", 0, FLB_TRUE, offsetof(struct lua_filter, time_as_table), diff --git a/plugins/filter_lua/lua_config.h b/plugins/filter_lua/lua_config.h index e67beb38cb9..97de1c825b1 100644 --- a/plugins/filter_lua/lua_config.h +++ b/plugins/filter_lua/lua_config.h @@ -34,6 +34,7 @@ struct lua_filter { flb_sds_t call; /* function name */ flb_sds_t buffer; /* json dec buffer */ int protected_mode; /* exec lua function in protected mode */ + int chunk_mode; /* pass whole chunk to lua script */ int time_as_table; /* timestamp as a Lua table */ int enable_flb_null; /* Use flb_null in Lua */ struct flb_lua_l2c_config l2cc; /* lua -> C config */ diff --git a/src/flb_lua.c b/src/flb_lua.c index 4eca5b0ca8c..fdb6313d24b 100644 --- a/src/flb_lua.c +++ b/src/flb_lua.c @@ -173,7 +173,6 @@ void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o) struct flb_lua_metadata meta; lua_checkstack(l, 3); - switch(o->type) { case MSGPACK_OBJECT_NIL: lua_getglobal(l, FLB_LUA_VAR_FLB_NULL); @@ -839,3 +838,23 @@ void flb_lua_dump_stack(FILE *out, lua_State *l) } fprintf(out, "======\n"); } + +void flb_lua_bulk_process(lua_State *l, struct flb_time *t, msgpack_object *o, int table_index) { + int i; + struct flb_lua_metadata meta; + + lua_checkstack(l, 3); + + lua_createtable(l, 0, 2); + + lua_pushstring(l, "timestamp"); + flb_lua_pushtimetable(l, t); + lua_rawset(l, -3); + + lua_pushstring(l, "record"); + flb_lua_pushmsgpack(l, o); + lua_rawset(l, -3); + + /* Now, append this table to the global Lua table at the given index */ + lua_rawseti(l, table_index, lua_objlen(l, table_index) + 1); +}