Skip to content

Commit

Permalink
filter_record_modifier: support metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Mar 10, 2024
1 parent 9d9ac68 commit be2cd45
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 54 deletions.
176 changes: 124 additions & 52 deletions plugins/filter_record_modifier/filter_modifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,62 +66,86 @@ static int config_allowlist_key(struct record_modifier_ctx *ctx,
return 0;
}

static int configure(struct record_modifier_ctx *ctx,
struct flb_filter_instance *f_ins)
/* for 'Record' and 'Metadata' */
static int configure_map(struct record_modifier_ctx *ctx,
struct mk_list *map_from_config,
struct mk_list *output)
{
struct mk_list *head = NULL;
struct modifier_key *mod_key;
struct modifier_record *mod_record;
struct flb_config_map_val *mv;
struct modifier_kv *mod_kv;
struct flb_slist_entry *sentry = NULL;
int counter = 0;

ctx->records_num = 0;
ctx->remove_keys_num = 0;
ctx->allowlist_keys_num = 0;

if (flb_filter_config_map_set(f_ins, ctx) < 0) {
flb_errno();
flb_plg_error(f_ins, "configuration error");
return -1;
}

/* Check 'Record' properties */
flb_config_map_foreach(head, mv, ctx->records_map) {
mod_record = flb_malloc(sizeof(struct modifier_record));
if (!mod_record) {
flb_config_map_foreach(head, mv, map_from_config) {
mod_kv = flb_malloc(sizeof(struct modifier_kv));
if (!mod_kv) {
flb_errno();
continue;
}

if (mk_list_size(mv->val.list) != 2) {
flb_plg_error(ctx->ins, "invalid record parameters, "
"expects 'KEY VALUE'");
flb_free(mod_record);
flb_free(mod_kv);
continue;
}
/* Get first value (field) */
sentry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
mod_record->key_len = flb_sds_len(sentry->str);
mod_record->key = flb_strndup(sentry->str, mod_record->key_len);
if (mod_record->key == NULL) {
mod_kv->key_len = flb_sds_len(sentry->str);
mod_kv->key = flb_strndup(sentry->str, mod_kv->key_len);
if (mod_kv->key == NULL) {
flb_errno();
flb_free(mod_record);
flb_free(mod_kv);
continue;
}

sentry = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
mod_record->val_len = flb_sds_len(sentry->str);
mod_record->val = flb_strndup(sentry->str, mod_record->val_len);
if (mod_record->val == NULL) {
mod_kv->val_len = flb_sds_len(sentry->str);
mod_kv->val = flb_strndup(sentry->str, mod_kv->val_len);
if (mod_kv->val == NULL) {
flb_errno();
flb_free(mod_record->key);
flb_free(mod_record);
flb_free(mod_kv->key);
flb_free(mod_kv);
continue;
}

mk_list_add(&mod_record->_head, &ctx->records);
ctx->records_num++;
mk_list_add(&mod_kv->_head, output);
counter++;
}
return counter;
}

static int configure(struct record_modifier_ctx *ctx,
struct flb_filter_instance *f_ins)
{
struct mk_list *head = NULL;
struct modifier_key *mod_key;
struct flb_config_map_val *mv;

ctx->records_num = 0;
ctx->remove_keys_num = 0;
ctx->allowlist_keys_num = 0;

if (flb_filter_config_map_set(f_ins, ctx) < 0) {
flb_errno();
flb_plg_error(f_ins, "configuration error");
return -1;
}

/* Check 'Record' properties */
ctx->records_num = configure_map(ctx, ctx->records_map, &ctx->records);
if (ctx->records_num < 0) {
flb_plg_error(f_ins, "configuration error");
return -1;
}
/* Check 'Metadata' properties */
ctx->metadata_num = configure_map(ctx, ctx->metadata_map, &ctx->metadata);
if (ctx->metadata_num < 0) {
flb_plg_error(f_ins, "configuration error");
return -1;
}

/* Check "Remove_Key" properties */
flb_config_map_foreach(head, mv, ctx->remove_keys_map) {
mod_key = flb_malloc(sizeof(struct modifier_key));
Expand Down Expand Up @@ -159,7 +183,7 @@ static int delete_list(struct record_modifier_ctx *ctx)
struct mk_list *tmp;
struct mk_list *head;
struct modifier_key *key;
struct modifier_record *record;
struct modifier_kv *record;

mk_list_foreach_safe(head, tmp, &ctx->remove_keys) {
key = mk_list_entry(head, struct modifier_key, _head);
Expand All @@ -172,7 +196,14 @@ static int delete_list(struct record_modifier_ctx *ctx)
flb_free(key);
}
mk_list_foreach_safe(head, tmp, &ctx->records) {
record = mk_list_entry(head, struct modifier_record, _head);
record = mk_list_entry(head, struct modifier_kv, _head);
flb_free(record->key);
flb_free(record->val);
mk_list_del(&record->_head);
flb_free(record);
}
mk_list_foreach_safe(head, tmp, &ctx->metadata) {
record = mk_list_entry(head, struct modifier_kv, _head);
flb_free(record->key);
flb_free(record->val);
mk_list_del(&record->_head);
Expand All @@ -195,6 +226,7 @@ static int cb_modifier_init(struct flb_filter_instance *f_ins,
return -1;
}
mk_list_init(&ctx->records);
mk_list_init(&ctx->metadata);
mk_list_init(&ctx->remove_keys);
mk_list_init(&ctx->allowlist_keys);
ctx->ins = f_ins;
Expand Down Expand Up @@ -294,6 +326,42 @@ static int create_uuid(struct record_modifier_ctx *ctx, char *uuid)
return 0;
}

static int append_kv(struct flb_log_event_encoder *encoder,
struct mk_list *map,
int type)
{
int ret;
struct mk_list *tmp;
struct mk_list *head;
struct modifier_kv *mod_rec;

if (type != FLB_LOG_EVENT_METADATA && type != FLB_LOG_EVENT_BODY) {
return -1;
}

mk_list_foreach_safe(head, tmp, map) {
mod_rec = mk_list_entry(head, struct modifier_kv, _head);

if (type == FLB_LOG_EVENT_BODY) {
ret = flb_log_event_encoder_append_body_values(
encoder,
FLB_LOG_EVENT_STRING_VALUE(mod_rec->key, mod_rec->key_len),
FLB_LOG_EVENT_STRING_VALUE(mod_rec->val, mod_rec->val_len));
}
else if (type == FLB_LOG_EVENT_METADATA) {
ret = flb_log_event_encoder_append_metadata_values(
encoder,
FLB_LOG_EVENT_STRING_VALUE(mod_rec->key, mod_rec->key_len),
FLB_LOG_EVENT_STRING_VALUE(mod_rec->val, mod_rec->val_len));
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
break;
}
}
return 0;
}

#define BOOL_MAP_LIMIT 65535
static int cb_modifier_filter(const void *data, size_t bytes,
const char *tag, int tag_len,
Expand All @@ -313,11 +381,8 @@ static int cb_modifier_filter(const void *data, size_t bytes,
size_t uuid_len = 0;
bool_map_t *bool_map = NULL;
struct flb_time tm;
struct modifier_record *mod_rec;
msgpack_object *obj;
msgpack_object_kv *kv;
struct mk_list *tmp;
struct mk_list *head;
struct flb_log_event_encoder log_encoder;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
Expand Down Expand Up @@ -405,9 +470,6 @@ static int cb_modifier_filter(const void *data, size_t bytes,

ret = flb_log_event_encoder_set_timestamp(&log_encoder, &tm);

ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
&log_encoder, log_event.metadata);

kv = obj->via.map.ptr;
for(i=0;
bool_map[i] != TAIL_OF_ARRAY &&
Expand All @@ -420,26 +482,31 @@ static int cb_modifier_filter(const void *data, size_t bytes,
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val));
}
}

flb_free(bool_map);
bool_map = NULL;

if (log_event.metadata->type == MSGPACK_OBJECT_MAP) {
kv = log_event.metadata->via.map.ptr;
for(i=0;
i < log_event.metadata->via.map.size &&
ret == FLB_EVENT_ENCODER_SUCCESS;
i++) {
ret = flb_log_event_encoder_append_metadata_values(
&log_encoder,
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val));
}
}

/* append record */
if (ctx->records_num > 0) {
is_modified = FLB_TRUE;

mk_list_foreach_safe(head, tmp, &ctx->records) {
mod_rec = mk_list_entry(head, struct modifier_record, _head);

ret = flb_log_event_encoder_append_body_values(
&log_encoder,
FLB_LOG_EVENT_STRING_VALUE(mod_rec->key, mod_rec->key_len),
FLB_LOG_EVENT_STRING_VALUE(mod_rec->val, mod_rec->val_len));

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
break;
}
}
ret = append_kv(&log_encoder, &ctx->records, FLB_LOG_EVENT_BODY);
}
/* append metadata */
if (ctx->metadata_num > 0) {
is_modified = FLB_TRUE;
ret = append_kv(&log_encoder, &ctx->metadata, FLB_LOG_EVENT_METADATA);
}

if (uuid_len > 0) {
Expand Down Expand Up @@ -494,6 +561,11 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct record_modifier_ctx, records_map),
"Append fields. This parameter needs key and value pair."
},
{
FLB_CONFIG_MAP_SLIST_2, "metadata", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct record_modifier_ctx, metadata_map),
"Append metadata. This parameter needs key and value pair."
},

{
FLB_CONFIG_MAP_STR, "remove_key", NULL,
Expand Down
6 changes: 4 additions & 2 deletions plugins/filter_record_modifier/filter_modifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_filter.h>

struct modifier_record {
struct modifier_kv {
char *key;
char *val;
int key_len;
Expand All @@ -41,18 +41,21 @@ struct modifier_key {

struct record_modifier_ctx {
int records_num;
int metadata_num;
int remove_keys_num;
int allowlist_keys_num;

flb_sds_t uuid_key;

/* config map */
struct mk_list *records_map;
struct mk_list *metadata_map;
struct mk_list *remove_keys_map;
struct mk_list *allowlist_keys_map;
struct mk_list *whitelist_keys_map;

struct mk_list records;
struct mk_list metadata;
struct mk_list remove_keys;
struct mk_list allowlist_keys;
struct flb_filter_instance *ins;
Expand All @@ -64,5 +67,4 @@ typedef enum {
TAIL_OF_ARRAY = 2
} bool_map_t;


#endif /* FLB_FILTER_RECORD_MODIFIER_H */

0 comments on commit be2cd45

Please sign in to comment.