Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_parser: Hash_key for parsed result #9274

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/fluent-bit/flb_msgpack_append_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3
#define FLB_MSGPACK_UNPACK_ERROR -4

#include <fluent-bit/flb_pack.h>
#include <msgpack/unpack.h>

int flb_msgpack_append_message_to_record(char **result_buffer,
size_t *result_size,
Expand All @@ -36,4 +38,12 @@ int flb_msgpack_append_message_to_record(char **result_buffer,
char *message_buffer,
size_t message_size,
int message_type);

int flb_msgpack_append_map_to_record(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *map_data,
size_t map_size);
#endif
42 changes: 39 additions & 3 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>
#include <fluent-bit/flb_msgpack_append_message.h>
#include <msgpack.h>

#include <string.h>
Expand Down Expand Up @@ -104,6 +105,7 @@ static int configure(struct filter_parser_ctx *ctx,
ctx->key_name = NULL;
ctx->reserve_data = FLB_FALSE;
ctx->preserve_key = FLB_FALSE;
ctx->hash_value_field = NULL;
mk_list_init(&ctx->parsers);

if (flb_filter_config_map_set(f_ins, ctx) < 0) {
Expand Down Expand Up @@ -188,6 +190,8 @@ static int cb_parser_filter(const void *data, size_t bytes,
int key_len;
const char *val_str;
int val_len;
char *parsed_buf;
size_t parsed_size;
char *out_buf;
size_t out_size;
struct flb_time parsed_time;
Expand Down Expand Up @@ -231,6 +235,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
out_buf = NULL;
parsed_buf = NULL;
append_arr_i = 0;

flb_time_copy(&tm, &log_event.timestamp);
Expand Down Expand Up @@ -278,7 +283,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
(void **) &parsed_buf, &parsed_size,
&parsed_time);
if (parse_ret >= 0) {
/*
Expand Down Expand Up @@ -322,12 +327,13 @@ static int cb_parser_filter(const void *data, size_t bytes,
&log_encoder, log_event.metadata);
}

if (out_buf != NULL) {
if (parsed_buf != NULL) {

if (ctx->reserve_data) {
char *new_buf = NULL;
int new_size;
int ret;
ret = flb_msgpack_expand_map(out_buf, out_size,
ret = flb_msgpack_expand_map(parsed_buf, parsed_size,
append_arr, append_arr_len,
&new_buf, &new_size);
if (ret == -1) {
Expand All @@ -340,6 +346,30 @@ static int cb_parser_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;
}

out_buf = new_buf;
out_size = new_size;
}
else {
out_buf = strdup(parsed_buf);
out_size = parsed_size;
}
if (ctx->hash_value_field != NULL) {
char *new_buf = NULL;
size_t new_size;
int ret;
ret = flb_msgpack_append_map_to_record(&new_buf, &new_size,
ctx->hash_value_field,
out_buf, out_size,
parsed_buf,parsed_size);
if ( ret != FLB_MAP_EXPAND_SUCCESS){
flb_plg_error(ctx->ins, "cannot append parsed entry to record");

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);

return FLB_FILTER_NOTOUCH;
}
flb_free(out_buf);
out_buf = new_buf;
out_size = new_size;
Expand All @@ -352,6 +382,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
}

flb_free(out_buf);
flb_free(parsed_buf);
ret = FLB_FILTER_MODIFIED;
}
else {
Expand Down Expand Up @@ -437,6 +468,11 @@ static struct flb_config_map config_map[] = {
"Keep all other original fields in the parsed result. "
"If false, all other original fields will be removed."
},
{
FLB_CONFIG_MAP_STR, "Hash_Value_Field", NULL,
0, FLB_TRUE, offsetof(struct filter_parser_ctx, hash_value_field),
"Stores the parsed values as a hash value in a field with key given. "
},
{
FLB_CONFIG_MAP_DEPRECATED, "Unescape_key", NULL,
0, FLB_FALSE, 0,
Expand Down
1 change: 1 addition & 0 deletions plugins/filter_parser/filter_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct filter_parser_ctx {
int key_name_len;
int reserve_data;
int preserve_key;
flb_sds_t hash_value_field;
struct mk_list parsers;
struct flb_filter_instance *ins;
};
Expand Down
60 changes: 60 additions & 0 deletions src/flb_msgpack_append_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,63 @@ int flb_msgpack_append_message_to_record(char **result_buffer,

return result;
}

int flb_msgpack_append_map_to_record(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *map_data,
size_t map_size)
{
msgpack_unpacked unpacker;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
char *modified_data_buffer;
int modified_data_size;
size_t off = 0;
int i;
int result = FLB_MAP_NOT_MODIFIED;
*result_buffer = NULL;
*result_size = 0;

if (message_key_name == NULL || map_data == NULL){
return result;
}

new_map_entries[0] = &message_entry;

message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

msgpack_unpacked_init(&unpacker);
if ((i = msgpack_unpack_next(&unpacker,
map_data,
map_size, &off)) != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&unpacker);
return FLB_MSGPACK_UNPACK_ERROR;
}
if (unpacker.data.type != MSGPACK_OBJECT_MAP) {
msgpack_unpacked_destroy(&unpacker);
return FLB_MSGPACK_UNPACK_ERROR;
}

message_entry.val = unpacker.data;
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;
}
else {
result = FLB_MAP_EXPANSION_ERROR;
}
msgpack_unpacked_destroy(&unpacker);

return result;
}
81 changes: 81 additions & 0 deletions tests/runtime/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,86 @@ void flb_test_filter_parser_preserve_original_field()
flb_destroy(ctx);
}

void flb_test_filter_parser_hash_value_field()
{
int ret;
int bytes;
char *p, *output, *expected;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;
int filter_ffd;
struct flb_parser *parser;

struct flb_lib_out_cb cb;
cb.cb = callback_test;
cb.data = NULL;

clear_output();

ctx = flb_create();

/* Configure service */
flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace", "1", "Log_Level", "debug", NULL);

/* Input */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd,
"Tag", "test",
NULL);

/* Parser */
parser = flb_parser_create("dummy_test", "regex", "^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$",
Athishpranav2003 marked this conversation as resolved.
Show resolved Hide resolved
FLB_TRUE,
NULL, NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE, NULL, 0,
NULL, ctx->config);
TEST_CHECK(parser != NULL);

/* Filter */
filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
TEST_CHECK(filter_ffd >= 0);
ret = flb_filter_set(ctx, filter_ffd,
"Match", "test",
"Key_Name", "data",
"Parser", "dummy_test",
"Hash_Value_Field", "parsed",
NULL);
TEST_CHECK(ret == 0);

/* Output */
out_ffd = flb_output(ctx, (char *) "lib", &cb);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,
"Match", "*",
"format", "json",
NULL);

/* Start the engine */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data */
p = "[1448403340,{\"data\":\"100 0.5 true This is an example\",\"log\":\"An example\"}]";
bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
TEST_CHECK(bytes == strlen(p));

wait_with_timeout(2000, &output); /* waiting flush and ensuring data flush */
TEST_CHECK_(output != NULL, "Expected output to not be NULL");
if (output != NULL) {
/* check hash value field present */
expected = "\"parsed\":{\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"}";
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
/* check fields were extracted */
expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"";
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
free(output);
}

flb_stop(ctx);
flb_destroy(ctx);
}

// https://github.com/fluent/fluent-bit/issues/2250
void flb_test_filter_parser_first_matched_when_mutilple_parser()
{
Expand Down Expand Up @@ -984,6 +1064,7 @@ TEST_LIST = {
{"filter_parser_use_system_timezone", flb_test_filter_parser_use_system_timezone },
{"filter_parser_ignore_malformed_time", flb_test_filter_parser_ignore_malformed_time },
{"filter_parser_preserve_original_field", flb_test_filter_parser_preserve_original_field },
{"filter_parser_hash_value_field", flb_test_filter_parser_hash_value_field },
{"filter_parser_first_matched_when_multiple_parser", flb_test_filter_parser_first_matched_when_mutilple_parser },
{"filter_parser_skip_empty_values_false", flb_test_filter_parser_skip_empty_values_false},
{NULL, NULL}
Expand Down
Loading