diff --git a/plugins/filter_lua/lua.c b/plugins/filter_lua/lua.c index de7e6983188..c4d0a8667f8 100644 --- a/plugins/filter_lua/lua.c +++ b/plugins/filter_lua/lua.c @@ -428,13 +428,11 @@ static int pack_record(struct lua_filter *ctx, } if (ret == FLB_EVENT_ENCODER_SUCCESS && metadata != NULL) { - ret = flb_log_event_encoder_set_metadata_from_msgpack_object( - log_encoder, metadata); + ret = flb_log_event_encoder_set_metadata_from_msgpack_object(log_encoder, metadata); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - log_encoder, body); + ret = flb_log_event_encoder_set_body_from_msgpack_object(log_encoder, body); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -444,32 +442,49 @@ static int pack_record(struct lua_filter *ctx, return ret; } -static int pack_result (struct lua_filter *ctx, struct flb_time *ts, - msgpack_object *metadata, - struct flb_log_event_encoder *log_encoder, - char *data, size_t bytes) +static int pack_result(struct lua_filter *ctx, struct flb_time *ts, + //msgpack_object *metadata, + struct flb_log_event *log_event, + struct flb_log_event_encoder *log_encoder, + char *meta_buf, size_t meta_size, + char *body_buf, size_t body_size) { int ret; size_t index = 0; size_t off = 0; msgpack_object *entry; - msgpack_unpacked result; - - msgpack_unpacked_init(&result); - - ret = msgpack_unpack_next(&result, data, bytes, &off); + msgpack_unpacked result_meta; + msgpack_unpacked result_body; + + /* unpack metadata if set */ + if (meta_buf) { + msgpack_unpacked_init(&result_meta); + ret = msgpack_unpack_next(&result_meta, meta_buf, meta_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + msgpack_unpacked_destroy(&result_meta); + return FLB_FALSE; + } + } + /* Pack record */ + msgpack_unpacked_init(&result_body); + off = 0; + ret = msgpack_unpack_next(&result_body, body_buf, body_size, &off); if (ret != MSGPACK_UNPACK_SUCCESS) { - msgpack_unpacked_destroy(&result); - + msgpack_unpacked_destroy(&result_body); + if (meta_buf) { + msgpack_unpacked_destroy(&result_meta); + } return FLB_FALSE; } - if (result.data.type == MSGPACK_OBJECT_MAP) { - ret = pack_record(ctx, log_encoder, - ts, metadata, &result.data); + if (result_body.data.type == MSGPACK_OBJECT_MAP) { + ret = pack_record(ctx, log_encoder, ts, &result_meta.data, &result_body.data); + msgpack_unpacked_destroy(&result_body); - msgpack_unpacked_destroy(&result); + if (meta_buf) { + msgpack_unpacked_destroy(&result_meta); + } if (ret != FLB_EVENT_ENCODER_SUCCESS) { return FLB_FALSE; @@ -477,34 +492,30 @@ static int pack_result (struct lua_filter *ctx, struct flb_time *ts, return FLB_TRUE; } - else if (result.data.type == MSGPACK_OBJECT_ARRAY) { - for (index = 0 ; index < result.data.via.array.size ; index++) { - entry = &result.data.via.array.ptr[index]; + else if (result_body.data.type == MSGPACK_OBJECT_ARRAY) { + for (index = 0 ; index < result_body.data.via.array.size ; index++) { + entry = &result_body.data.via.array.ptr[index]; if (entry->type == MSGPACK_OBJECT_MAP) { - ret = pack_record(ctx, log_encoder, - ts, metadata, entry); + //ret = pack_record(ctx, log_encoder, ts, metadata, entry); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - msgpack_unpacked_destroy(&result); - + msgpack_unpacked_destroy(&result_body); return FLB_FALSE; } } else { - msgpack_unpacked_destroy(&result); + msgpack_unpacked_destroy(&result_body); return FLB_FALSE; } } - msgpack_unpacked_destroy(&result); - + msgpack_unpacked_destroy(&result_body); return FLB_TRUE; } - msgpack_unpacked_destroy(&result); - + msgpack_unpacked_destroy(&result_body); return FLB_FALSE; } @@ -524,8 +535,10 @@ static int cb_lua_filter(const void *data, size_t bytes, /* Lua return values */ int l_code; double l_timestamp; - msgpack_packer data_pck; - msgpack_sbuffer data_sbuf; + msgpack_packer body_pck = {0}; + msgpack_sbuffer body_sbuf = {0}; + msgpack_packer meta_pck = {0}; + msgpack_sbuffer meta_sbuf = {0}; struct flb_log_event_encoder log_encoder; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; @@ -555,11 +568,9 @@ static int cb_lua_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - msgpack_sbuffer_init(&data_sbuf); - msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write); + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + msgpack_sbuffer_init(&body_sbuf); + msgpack_packer_init(&body_pck, &body_sbuf, msgpack_sbuffer_write); /* Get timestamp */ flb_time_copy(&t, &log_event.timestamp); @@ -578,15 +589,25 @@ static int cb_lua_filter(const void *data, size_t bytes, lua_pushnumber(ctx->lua->state, ts); } + /* Metadata: on v1, logs metadata is not set, however in v2 it is */ + if (ctx->api_version == LUA_API_V2) { + flb_lua_pushmsgpack(ctx->lua->state, log_event.metadata); + } + flb_lua_pushmsgpack(ctx->lua->state, log_event.body); if (ctx->protected_mode) { - ret = lua_pcall(ctx->lua->state, 3, 3, 0); + if (ctx->api_version == LUA_API_V1) { + ret = lua_pcall(ctx->lua->state, 3, 3, 0); + } + else { + ret = lua_pcall(ctx->lua->state, 4, 4, 0); + } if (ret != 0) { flb_plg_error(ctx->ins, "error code %d: %s", ret, lua_tostring(ctx->lua->state, -1)); lua_pop(ctx->lua->state, 1); - msgpack_sbuffer_destroy(&data_sbuf); + msgpack_sbuffer_destroy(&body_sbuf); flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); @@ -594,16 +615,40 @@ static int cb_lua_filter(const void *data, size_t bytes, } } else { - lua_call(ctx->lua->state, 3, 3); + if (ctx->api_version == LUA_API_V1) { + lua_call(ctx->lua->state, 3, 3); + } + else { + lua_call(ctx->lua->state, 4, 4); + } } /* Initialize Return values */ l_code = 0; l_timestamp = ts; - flb_lua_tomsgpack(ctx->lua->state, &data_pck, 0, &ctx->l2cc); + /* log body */ + flb_lua_tomsgpack(ctx->lua->state, &body_pck, 0, &ctx->l2cc); lua_pop(ctx->lua->state, 1); + if (ctx->api_version == LUA_API_V2) { + /* initialize msgpack buffer for metadata */ + msgpack_sbuffer_init(&meta_sbuf); + msgpack_packer_init(&meta_pck, &meta_sbuf, msgpack_sbuffer_write); + + /* Check for metadata (third return value) */ + if (lua_istable(ctx->lua->state, -1)) { + /* Metadata is present */ + flb_lua_tomsgpack(ctx->lua->state, &meta_pck, 0, &ctx->l2cc); + lua_pop(ctx->lua->state, 1); + } + else { + /* Metadata not modified */ + lua_pop(ctx->lua->state, 1); + msgpack_sbuffer_destroy(&meta_sbuf); + } + } + /* Lua table */ if (ctx->time_as_table == FLB_TRUE) { if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { @@ -627,11 +672,15 @@ static int cb_lua_filter(const void *data, size_t bytes, lua_pop(ctx->lua->state, 1); } + /* return value from Lua */ l_code = (int) lua_tointeger(ctx->lua->state, -1); lua_pop(ctx->lua->state, 1); if (l_code == -1) { /* Skip record */ - msgpack_sbuffer_destroy(&data_sbuf); + if (ctx->api_version == LUA_API_V2) { + msgpack_sbuffer_destroy(&meta_sbuf); + } + msgpack_sbuffer_destroy(&body_sbuf); continue; } else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */ @@ -645,13 +694,14 @@ static int cb_lua_filter(const void *data, size_t bytes, t = t_orig; } - ret = pack_result(ctx, &t, log_event.metadata, &log_encoder, - data_sbuf.data, data_sbuf.size); + ret = pack_result(ctx, &t, &log_event, &log_encoder, + meta_sbuf.data, meta_sbuf.size, + body_sbuf.data, body_sbuf.size); if (ret == FLB_FALSE) { flb_plg_error(ctx->ins, "invalid table returned at %s(), %s", ctx->call, ctx->script); - msgpack_sbuffer_destroy(&data_sbuf); + msgpack_sbuffer_destroy(&body_sbuf); flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); @@ -678,7 +728,7 @@ static int cb_lua_filter(const void *data, size_t bytes, } } - msgpack_sbuffer_destroy(&data_sbuf); + msgpack_sbuffer_destroy(&body_sbuf); } if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) { @@ -724,6 +774,10 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "The path of lua script." }, + { + FLB_CONFIG_MAP_STR, "api", "v1", + 0, FLB_TRUE, offsetof(struct lua_filter, api_version_str), + }, { FLB_CONFIG_MAP_STR, "code", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/filter_lua/lua_config.c b/plugins/filter_lua/lua_config.c index 3fc80f048d3..40970c0db7c 100644 --- a/plugins/filter_lua/lua_config.c +++ b/plugins/filter_lua/lua_config.c @@ -67,6 +67,19 @@ struct lua_filter *lua_config_create(struct flb_filter_instance *ins, lf->ins = ins; lf->script = NULL; + /* api version */ + if (strcmp(lf->api_version_str, "v1") == 0) { + lf->api_version = LUA_API_V1; + } + else if (strcmp(lf->api_version_str, "v2") == 0) { + lf->api_version = LUA_API_V2; + } + else { + flb_plg_error(lf->ins, "invalid API version '%s'", lf->api_version_str); + flb_free(lf); + return NULL; + } + /* config: code */ tmp = flb_filter_get_property("code", ins); if (tmp) { diff --git a/plugins/filter_lua/lua_config.h b/plugins/filter_lua/lua_config.h index e67beb38cb9..365eef62ef9 100644 --- a/plugins/filter_lua/lua_config.h +++ b/plugins/filter_lua/lua_config.h @@ -27,8 +27,11 @@ #include #define LUA_BUFFER_CHUNK 1024 * 8 /* 8K should be enough to get started */ +#define LUA_API_V1 1 +#define LUA_API_V2 2 struct lua_filter { + flb_sds_t api_version_str; /* API version string */ flb_sds_t code; /* lua script source code */ flb_sds_t script; /* lua script path */ flb_sds_t call; /* function name */ @@ -36,6 +39,7 @@ struct lua_filter { int protected_mode; /* exec lua function in protected mode */ int time_as_table; /* timestamp as a Lua table */ int enable_flb_null; /* Use flb_null in Lua */ + int api_version; struct flb_lua_l2c_config l2cc; /* lua -> C config */ struct flb_luajit *lua; /* state context */ struct flb_filter_instance *ins; /* filter instance */