Skip to content

Commit

Permalink
Add additional entry in the record for parsed result
Browse files Browse the repository at this point in the history
Signed-off-by: Athish Pranav D <[email protected]>
  • Loading branch information
Athishpranav2003 committed Aug 26, 2024
1 parent d02a383 commit 9b6e32d
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 5 deletions.
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_msgpack_append_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3

#include <fluent-bit/flb_pack.h>
#include <msgpack/unpack.h>

int flb_msgpack_append_message_to_record(char **result_buffer,
size_t *result_size,
Expand All @@ -36,4 +37,12 @@ int flb_msgpack_append_message_to_record(char **result_buffer,
char *message_buffer,
size_t message_size,
int message_type);

int flb_msgpack_append_map_to_record(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *map_data,
size_t map_size);
#endif
40 changes: 35 additions & 5 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>
#include <fluent-bit/flb_msgpack_append_message.h>
#include <msgpack.h>

#include <string.h>
#include <fluent-bit.h>
#include <time.h>

#include "filter_parser.h"

Expand Down Expand Up @@ -186,6 +186,8 @@ static int cb_parser_filter(const void *data, size_t bytes,
int key_len;
const char *val_str;
int val_len;
char *parsed_buf;
size_t parsed_size;
char *out_buf;
size_t out_size;
struct flb_time parsed_time;
Expand Down Expand Up @@ -229,6 +231,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
out_buf = NULL;
parsed_buf = NULL;
append_arr_i = 0;

flb_time_copy(&tm, &log_event.timestamp);
Expand Down Expand Up @@ -276,7 +279,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
(void **) &parsed_buf, &parsed_size,
&parsed_time);
if (parse_ret >= 0) {
/*
Expand Down Expand Up @@ -320,13 +323,13 @@ static int cb_parser_filter(const void *data, size_t bytes,
&log_encoder, log_event.metadata);
}

if (out_buf != NULL) {
if (parsed_buf != NULL) {

if (ctx->reserve_data) {
char *new_buf = NULL;
int new_size;
int ret;

ret = flb_msgpack_expand_map(out_buf, out_size,
ret = flb_msgpack_expand_map(parsed_buf, parsed_size,
append_arr, append_arr_len,
&new_buf, &new_size);
if (ret == -1) {
Expand All @@ -339,6 +342,32 @@ static int cb_parser_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;
}

out_buf = new_buf;
out_size = new_size;
}
else {
out_buf = strdup(parsed_buf);
out_size = parsed_size;
}
if (ctx->hash_value_field) {
char *new_buf = NULL;
size_t new_size;
int ret;
flb_sds_t hash_key = flb_sds_create("parsed");
ret = flb_msgpack_append_map_to_record(&new_buf, &new_size,
hash_key,
out_buf, out_size,
parsed_buf,parsed_size);
flb_sds_destroy(hash_key);
if ( ret != FLB_MAP_EXPAND_SUCCESS){
flb_plg_error(ctx->ins, "cannot append parsed entry to record");

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);

return FLB_FILTER_NOTOUCH;
}
flb_free(out_buf);
out_buf = new_buf;
out_size = new_size;
Expand All @@ -351,6 +380,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
}

flb_free(out_buf);
flb_free(parsed_buf);
ret = FLB_FILTER_MODIFIED;
}
else {
Expand Down
59 changes: 59 additions & 0 deletions src/flb_msgpack_append_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,62 @@ int flb_msgpack_append_message_to_record(char **result_buffer,

return result;
}

int flb_msgpack_append_map_to_record(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *map_data,
size_t map_size)
{
msgpack_unpacked unpacker;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
char *modified_data_buffer;
int modified_data_size;
size_t off = 0;
int i;
int result = FLB_MAP_NOT_MODIFIED;
*result_buffer = NULL;
*result_size = 0;

if (message_key_name == NULL || map_data == NULL){
return result;
}

new_map_entries[0] = &message_entry;

message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

msgpack_unpacked_init(&unpacker);
if ((i=msgpack_unpack_next(&unpacker, map_data, map_size, &off)) !=
MSGPACK_UNPACK_SUCCESS ) {
msgpack_unpacked_destroy(&unpacker);
return FLB_MAP_EXPANSION_ERROR;
}
if (unpacker.data.type != MSGPACK_OBJECT_MAP) {
msgpack_unpacked_destroy(&unpacker);
return FLB_MAP_EXPANSION_ERROR;
}

message_entry.val = unpacker.data;
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;
}
else {
result = FLB_MAP_EXPANSION_ERROR;
}
msgpack_unpacked_destroy(&unpacker);

return result;
}

0 comments on commit 9b6e32d

Please sign in to comment.