diff --git a/include/fluent-bit/flb_msgpack_append_message.h b/include/fluent-bit/flb_msgpack_append_message.h index 5d821f27457..b2c5049febf 100644 --- a/include/fluent-bit/flb_msgpack_append_message.h +++ b/include/fluent-bit/flb_msgpack_append_message.h @@ -25,8 +25,10 @@ #define FLB_MAP_NOT_MODIFIED -1 #define FLB_MAP_EXPANSION_ERROR -2 #define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3 +#define FLB_MSGPACK_UNPACK_ERROR -4 #include +#include int flb_msgpack_append_message_to_record(char **result_buffer, size_t *result_size, @@ -36,4 +38,12 @@ int flb_msgpack_append_message_to_record(char **result_buffer, char *message_buffer, size_t message_size, int message_type); + +int flb_msgpack_append_map_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *map_data, + size_t map_size); #endif diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index 11bf71f7e53..c956700911e 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -104,6 +105,7 @@ static int configure(struct filter_parser_ctx *ctx, ctx->key_name = NULL; ctx->reserve_data = FLB_FALSE; ctx->preserve_key = FLB_FALSE; + ctx->hash_value_field = NULL; mk_list_init(&ctx->parsers); if (flb_filter_config_map_set(f_ins, ctx) < 0) { @@ -188,6 +190,8 @@ static int cb_parser_filter(const void *data, size_t bytes, int key_len; const char *val_str; int val_len; + char *parsed_buf; + size_t parsed_size; char *out_buf; size_t out_size; struct flb_time parsed_time; @@ -231,6 +235,7 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { out_buf = NULL; + parsed_buf = NULL; append_arr_i = 0; flb_time_copy(&tm, &log_event.timestamp); @@ -278,7 +283,7 @@ static int cb_parser_filter(const void *data, size_t bytes, flb_time_zero(&parsed_time); parse_ret = flb_parser_do(fp->parser, val_str, val_len, - (void **) &out_buf, &out_size, + (void **) &parsed_buf, &parsed_size, &parsed_time); if (parse_ret >= 0) { /* @@ -322,12 +327,13 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_encoder, log_event.metadata); } - if (out_buf != NULL) { + if (parsed_buf != NULL) { + if (ctx->reserve_data) { char *new_buf = NULL; int new_size; int ret; - ret = flb_msgpack_expand_map(out_buf, out_size, + ret = flb_msgpack_expand_map(parsed_buf, parsed_size, append_arr, append_arr_len, &new_buf, &new_size); if (ret == -1) { @@ -340,6 +346,30 @@ static int cb_parser_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } + out_buf = new_buf; + out_size = new_size; + } + else { + out_buf = strdup(parsed_buf); + out_size = parsed_size; + } + if (ctx->hash_value_field != NULL) { + char *new_buf = NULL; + size_t new_size; + int ret; + ret = flb_msgpack_append_map_to_record(&new_buf, &new_size, + ctx->hash_value_field, + out_buf, out_size, + parsed_buf,parsed_size); + if ( ret != FLB_MAP_EXPAND_SUCCESS){ + flb_plg_error(ctx->ins, "cannot append parsed entry to record"); + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + flb_free(append_arr); + + return FLB_FILTER_NOTOUCH; + } flb_free(out_buf); out_buf = new_buf; out_size = new_size; @@ -352,6 +382,7 @@ static int cb_parser_filter(const void *data, size_t bytes, } flb_free(out_buf); + flb_free(parsed_buf); ret = FLB_FILTER_MODIFIED; } else { @@ -437,6 +468,11 @@ static struct flb_config_map config_map[] = { "Keep all other original fields in the parsed result. " "If false, all other original fields will be removed." }, + { + FLB_CONFIG_MAP_STR, "Hash_Value_Field", NULL, + 0, FLB_TRUE, offsetof(struct filter_parser_ctx, hash_value_field), + "Stores the parsed values as a hash value in a field with key given. " + }, { FLB_CONFIG_MAP_DEPRECATED, "Unescape_key", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/filter_parser/filter_parser.h b/plugins/filter_parser/filter_parser.h index 36b26d9ec34..0b99ee0f565 100644 --- a/plugins/filter_parser/filter_parser.h +++ b/plugins/filter_parser/filter_parser.h @@ -35,6 +35,7 @@ struct filter_parser_ctx { int key_name_len; int reserve_data; int preserve_key; + flb_sds_t hash_value_field; struct mk_list parsers; struct flb_filter_instance *ins; }; diff --git a/src/flb_msgpack_append_message.c b/src/flb_msgpack_append_message.c index 75df33b5c7a..a2217617a47 100644 --- a/src/flb_msgpack_append_message.c +++ b/src/flb_msgpack_append_message.c @@ -80,3 +80,63 @@ int flb_msgpack_append_message_to_record(char **result_buffer, return result; } + +int flb_msgpack_append_map_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *map_data, + size_t map_size) +{ + msgpack_unpacked unpacker; + msgpack_object_kv *new_map_entries[1]; + msgpack_object_kv message_entry; + char *modified_data_buffer; + int modified_data_size; + size_t off = 0; + int i; + int result = FLB_MAP_NOT_MODIFIED; + *result_buffer = NULL; + *result_size = 0; + + if (message_key_name == NULL || map_data == NULL){ + return result; + } + + new_map_entries[0] = &message_entry; + + message_entry.key.type = MSGPACK_OBJECT_STR; + message_entry.key.via.str.size = flb_sds_len(message_key_name); + message_entry.key.via.str.ptr = message_key_name; + + msgpack_unpacked_init(&unpacker); + if ((i = msgpack_unpack_next(&unpacker, + map_data, + map_size, &off)) != MSGPACK_UNPACK_SUCCESS) { + msgpack_unpacked_destroy(&unpacker); + return FLB_MSGPACK_UNPACK_ERROR; + } + if (unpacker.data.type != MSGPACK_OBJECT_MAP) { + msgpack_unpacked_destroy(&unpacker); + return FLB_MSGPACK_UNPACK_ERROR; + } + + message_entry.val = unpacker.data; + result = flb_msgpack_expand_map(base_object_buffer, + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); + if (result == 0) { + result = FLB_MAP_EXPAND_SUCCESS; + *result_buffer = modified_data_buffer; + *result_size = modified_data_size; + } + else { + result = FLB_MAP_EXPANSION_ERROR; + } + msgpack_unpacked_destroy(&unpacker); + + return result; +} \ No newline at end of file diff --git a/tests/runtime/filter_parser.c b/tests/runtime/filter_parser.c index 8f25fec0e6e..93ac6203eeb 100644 --- a/tests/runtime/filter_parser.c +++ b/tests/runtime/filter_parser.c @@ -812,6 +812,86 @@ void flb_test_filter_parser_preserve_original_field() flb_destroy(ctx); } +void flb_test_filter_parser_hash_value_field() +{ + int ret; + int bytes; + char *p, *output, *expected; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + struct flb_parser *parser; + + struct flb_lib_out_cb cb; + cb.cb = callback_test; + cb.data = NULL; + + clear_output(); + + ctx = flb_create(); + + /* Configure service */ + flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace", "1", "Log_Level", "debug", NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, + "Tag", "test", + NULL); + + /* Parser */ + parser = flb_parser_create("dummy_test", "regex", "^(?[^ ]+) (?[^ ]+) (?[^ ]+) (?.+)$", + FLB_TRUE, + NULL, NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE, NULL, 0, + NULL, ctx->config); + TEST_CHECK(parser != NULL); + + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "parser", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "test", + "Key_Name", "data", + "Parser", "dummy_test", + "Hash_Value_Field", "parsed", + NULL); + TEST_CHECK(ret == 0); + + /* Output */ + out_ffd = flb_output(ctx, (char *) "lib", &cb); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "Match", "*", + "format", "json", + NULL); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data */ + p = "[1448403340,{\"data\":\"100 0.5 true This is an example\",\"log\":\"An example\"}]"; + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + + wait_with_timeout(2000, &output); /* waiting flush and ensuring data flush */ + TEST_CHECK_(output != NULL, "Expected output to not be NULL"); + if (output != NULL) { + /* check hash value field present */ + expected = "\"parsed\":{\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"}"; + TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output); + /* check fields were extracted */ + expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\""; + TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output); + free(output); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + // https://github.com/fluent/fluent-bit/issues/2250 void flb_test_filter_parser_first_matched_when_mutilple_parser() { @@ -984,6 +1064,7 @@ TEST_LIST = { {"filter_parser_use_system_timezone", flb_test_filter_parser_use_system_timezone }, {"filter_parser_ignore_malformed_time", flb_test_filter_parser_ignore_malformed_time }, {"filter_parser_preserve_original_field", flb_test_filter_parser_preserve_original_field }, + {"filter_parser_hash_value_field", flb_test_filter_parser_hash_value_field }, {"filter_parser_first_matched_when_multiple_parser", flb_test_filter_parser_first_matched_when_mutilple_parser }, {"filter_parser_skip_empty_values_false", flb_test_filter_parser_skip_empty_values_false}, {NULL, NULL}