Skip to content

Commit

Permalink
filter_lua: add support for metadata handling
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Dec 10, 2024
1 parent 412d3ea commit ab7d616
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 47 deletions.
148 changes: 101 additions & 47 deletions plugins/filter_lua/lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,11 @@ static int pack_record(struct lua_filter *ctx,
}

if (ret == FLB_EVENT_ENCODER_SUCCESS && metadata != NULL) {
ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
log_encoder, metadata);
ret = flb_log_event_encoder_set_metadata_from_msgpack_object(log_encoder, metadata);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
log_encoder, body);
ret = flb_log_event_encoder_set_body_from_msgpack_object(log_encoder, body);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
Expand All @@ -444,67 +442,80 @@ static int pack_record(struct lua_filter *ctx,
return ret;
}

static int pack_result (struct lua_filter *ctx, struct flb_time *ts,
msgpack_object *metadata,
struct flb_log_event_encoder *log_encoder,
char *data, size_t bytes)
static int pack_result(struct lua_filter *ctx, struct flb_time *ts,
//msgpack_object *metadata,
struct flb_log_event *log_event,
struct flb_log_event_encoder *log_encoder,
char *meta_buf, size_t meta_size,
char *body_buf, size_t body_size)
{
int ret;
size_t index = 0;
size_t off = 0;
msgpack_object *entry;
msgpack_unpacked result;

msgpack_unpacked_init(&result);

ret = msgpack_unpack_next(&result, data, bytes, &off);
msgpack_unpacked result_meta;
msgpack_unpacked result_body;

/* unpack metadata if set */
if (meta_buf) {
msgpack_unpacked_init(&result_meta);
ret = msgpack_unpack_next(&result_meta, meta_buf, meta_size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result_meta);
return FLB_FALSE;
}
}

/* Pack record */
msgpack_unpacked_init(&result_body);
off = 0;
ret = msgpack_unpack_next(&result_body, body_buf, body_size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result);

msgpack_unpacked_destroy(&result_body);
if (meta_buf) {
msgpack_unpacked_destroy(&result_meta);
}
return FLB_FALSE;
}

if (result.data.type == MSGPACK_OBJECT_MAP) {
ret = pack_record(ctx, log_encoder,
ts, metadata, &result.data);
if (result_body.data.type == MSGPACK_OBJECT_MAP) {
ret = pack_record(ctx, log_encoder, ts, &result_meta.data, &result_body.data);
msgpack_unpacked_destroy(&result_body);

msgpack_unpacked_destroy(&result);
if (meta_buf) {
msgpack_unpacked_destroy(&result_meta);
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
return FLB_FALSE;
}

return FLB_TRUE;
}
else if (result.data.type == MSGPACK_OBJECT_ARRAY) {
for (index = 0 ; index < result.data.via.array.size ; index++) {
entry = &result.data.via.array.ptr[index];
else if (result_body.data.type == MSGPACK_OBJECT_ARRAY) {
for (index = 0 ; index < result_body.data.via.array.size ; index++) {
entry = &result_body.data.via.array.ptr[index];

if (entry->type == MSGPACK_OBJECT_MAP) {
ret = pack_record(ctx, log_encoder,
ts, metadata, entry);
//ret = pack_record(ctx, log_encoder, ts, metadata, entry);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
msgpack_unpacked_destroy(&result);

msgpack_unpacked_destroy(&result_body);
return FLB_FALSE;
}
}
else {
msgpack_unpacked_destroy(&result);
msgpack_unpacked_destroy(&result_body);

return FLB_FALSE;
}
}

msgpack_unpacked_destroy(&result);

msgpack_unpacked_destroy(&result_body);
return FLB_TRUE;
}

msgpack_unpacked_destroy(&result);

msgpack_unpacked_destroy(&result_body);
return FLB_FALSE;
}

Expand All @@ -524,8 +535,10 @@ static int cb_lua_filter(const void *data, size_t bytes,
/* Lua return values */
int l_code;
double l_timestamp;
msgpack_packer data_pck;
msgpack_sbuffer data_sbuf;
msgpack_packer body_pck = {0};
msgpack_sbuffer body_sbuf = {0};
msgpack_packer meta_pck = {0};
msgpack_sbuffer meta_sbuf = {0};
struct flb_log_event_encoder log_encoder;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
Expand Down Expand Up @@ -555,11 +568,9 @@ static int cb_lua_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;
}

while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_sbuffer_init(&data_sbuf);
msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write);
while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_sbuffer_init(&body_sbuf);
msgpack_packer_init(&body_pck, &body_sbuf, msgpack_sbuffer_write);

/* Get timestamp */
flb_time_copy(&t, &log_event.timestamp);
Expand All @@ -578,32 +589,66 @@ static int cb_lua_filter(const void *data, size_t bytes,
lua_pushnumber(ctx->lua->state, ts);
}

/* Metadata: on v1, logs metadata is not set, however in v2 it is */
if (ctx->api_version == LUA_API_V2) {
flb_lua_pushmsgpack(ctx->lua->state, log_event.metadata);
}

flb_lua_pushmsgpack(ctx->lua->state, log_event.body);
if (ctx->protected_mode) {
ret = lua_pcall(ctx->lua->state, 3, 3, 0);
if (ctx->api_version == LUA_API_V1) {
ret = lua_pcall(ctx->lua->state, 3, 3, 0);
}
else {
ret = lua_pcall(ctx->lua->state, 4, 4, 0);
}
if (ret != 0) {
flb_plg_error(ctx->ins, "error code %d: %s",
ret, lua_tostring(ctx->lua->state, -1));
lua_pop(ctx->lua->state, 1);

msgpack_sbuffer_destroy(&data_sbuf);
msgpack_sbuffer_destroy(&body_sbuf);
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);

return FLB_FILTER_NOTOUCH;
}
}
else {
lua_call(ctx->lua->state, 3, 3);
if (ctx->api_version == LUA_API_V1) {
lua_call(ctx->lua->state, 3, 3);
}
else {
lua_call(ctx->lua->state, 4, 4);
}
}

/* Initialize Return values */
l_code = 0;
l_timestamp = ts;

flb_lua_tomsgpack(ctx->lua->state, &data_pck, 0, &ctx->l2cc);
/* log body */
flb_lua_tomsgpack(ctx->lua->state, &body_pck, 0, &ctx->l2cc);
lua_pop(ctx->lua->state, 1);

if (ctx->api_version == LUA_API_V2) {
/* initialize msgpack buffer for metadata */
msgpack_sbuffer_init(&meta_sbuf);
msgpack_packer_init(&meta_pck, &meta_sbuf, msgpack_sbuffer_write);

/* Check for metadata (third return value) */
if (lua_istable(ctx->lua->state, -1)) {
/* Metadata is present */
flb_lua_tomsgpack(ctx->lua->state, &meta_pck, 0, &ctx->l2cc);
lua_pop(ctx->lua->state, 1);
}
else {
/* Metadata not modified */
lua_pop(ctx->lua->state, 1);
msgpack_sbuffer_destroy(&meta_sbuf);
}
}

/* Lua table */
if (ctx->time_as_table == FLB_TRUE) {
if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) {
Expand All @@ -627,11 +672,15 @@ static int cb_lua_filter(const void *data, size_t bytes,
lua_pop(ctx->lua->state, 1);
}

/* return value from Lua */
l_code = (int) lua_tointeger(ctx->lua->state, -1);
lua_pop(ctx->lua->state, 1);

if (l_code == -1) { /* Skip record */
msgpack_sbuffer_destroy(&data_sbuf);
if (ctx->api_version == LUA_API_V2) {
msgpack_sbuffer_destroy(&meta_sbuf);
}
msgpack_sbuffer_destroy(&body_sbuf);
continue;
}
else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */
Expand All @@ -645,13 +694,14 @@ static int cb_lua_filter(const void *data, size_t bytes,
t = t_orig;
}

ret = pack_result(ctx, &t, log_event.metadata, &log_encoder,
data_sbuf.data, data_sbuf.size);
ret = pack_result(ctx, &t, &log_event, &log_encoder,
meta_sbuf.data, meta_sbuf.size,
body_sbuf.data, body_sbuf.size);

if (ret == FLB_FALSE) {
flb_plg_error(ctx->ins, "invalid table returned at %s(), %s",
ctx->call, ctx->script);
msgpack_sbuffer_destroy(&data_sbuf);
msgpack_sbuffer_destroy(&body_sbuf);

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
Expand All @@ -678,7 +728,7 @@ static int cb_lua_filter(const void *data, size_t bytes,
}
}

msgpack_sbuffer_destroy(&data_sbuf);
msgpack_sbuffer_destroy(&body_sbuf);
}

if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) {
Expand Down Expand Up @@ -724,6 +774,10 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"The path of lua script."
},
{
FLB_CONFIG_MAP_STR, "api", "v1",
0, FLB_TRUE, offsetof(struct lua_filter, api_version_str),
},
{
FLB_CONFIG_MAP_STR, "code", NULL,
0, FLB_FALSE, 0,
Expand Down
13 changes: 13 additions & 0 deletions plugins/filter_lua/lua_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ struct lua_filter *lua_config_create(struct flb_filter_instance *ins,
lf->ins = ins;
lf->script = NULL;

/* api version */
if (strcmp(lf->api_version_str, "v1") == 0) {
lf->api_version = LUA_API_V1;
}
else if (strcmp(lf->api_version_str, "v2") == 0) {
lf->api_version = LUA_API_V2;
}
else {
flb_plg_error(lf->ins, "invalid API version '%s'", lf->api_version_str);
flb_free(lf);
return NULL;
}

/* config: code */
tmp = flb_filter_get_property("code", ins);
if (tmp) {
Expand Down
4 changes: 4 additions & 0 deletions plugins/filter_lua/lua_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@
#include <fluent-bit/flb_lua.h>

#define LUA_BUFFER_CHUNK 1024 * 8 /* 8K should be enough to get started */
#define LUA_API_V1 1
#define LUA_API_V2 2

struct lua_filter {
flb_sds_t api_version_str; /* API version string */
flb_sds_t code; /* lua script source code */
flb_sds_t script; /* lua script path */
flb_sds_t call; /* function name */
flb_sds_t buffer; /* json dec buffer */
int protected_mode; /* exec lua function in protected mode */
int time_as_table; /* timestamp as a Lua table */
int enable_flb_null; /* Use flb_null in Lua */
int api_version;
struct flb_lua_l2c_config l2cc; /* lua -> C config */
struct flb_luajit *lua; /* state context */
struct flb_filter_instance *ins; /* filter instance */
Expand Down

0 comments on commit ab7d616

Please sign in to comment.