Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_parser: refactoring. simplify msgpack loop. #7380

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 102 additions & 104 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,143 +236,141 @@ static int cb_parser_filter(const void *data, size_t bytes,
flb_time_copy(&tm, &log_event.timestamp);
obj = log_event.body;

if (obj->type == MSGPACK_OBJECT_MAP) {
map_num = obj->via.map.size;
if (ctx->reserve_data) {
append_arr_len = obj->via.map.size;
append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *));
if (obj->type != MSGPACK_OBJECT_MAP) {
continue;
}
map_num = obj->via.map.size;
if (ctx->reserve_data) {
append_arr_len = obj->via.map.size;
append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *));

if (append_arr == NULL) {
flb_errno();
if (append_arr == NULL) {
flb_errno();

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);

return FLB_FILTER_NOTOUCH;
}
return FLB_FILTER_NOTOUCH;
}
}

continue_parsing = FLB_TRUE;
for (i = 0; i < map_num && continue_parsing; i++) {
kv = &obj->via.map.ptr[i];
if (ctx->reserve_data) {
append_arr[append_arr_i] = kv;
append_arr_i++;
}
if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) {
/* key is not string */
continue_parsing = FLB_TRUE;
for (i = 0; i < map_num && continue_parsing; i++) {
kv = &obj->via.map.ptr[i];
if (ctx->reserve_data) {
append_arr[append_arr_i] = kv;
append_arr_i++;
}
if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) {
/* key is not string */
continue;
}
if (key_len == ctx->key_name_len &&
!strncmp(key_str, ctx->key_name, key_len)) {
if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) {
/* val is not string */
continue;
}
if (key_len == ctx->key_name_len &&
!strncmp(key_str, ctx->key_name, key_len)) {
if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) {
/* val is not string */
continue;
}

/* Lookup parser */
mk_list_foreach(head, &ctx->parsers) {
fp = mk_list_entry(head, struct filter_parser, _head);

/* Reset time */
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
&parsed_time);
if (parse_ret >= 0) {
/*
* If the parser succeeded we need to check the
* status of the parsed time. If the time was
* parsed successfully 'parsed_time' will be
* different than zero, if so, override the time
* holder with the new value, otherwise keep the
* original.
*/
if (flb_time_to_nanosec(&parsed_time) != 0L) {
flb_time_copy(&tm, &parsed_time);
}
/* Lookup parser */
mk_list_foreach(head, &ctx->parsers) {
fp = mk_list_entry(head, struct filter_parser, _head);

/* Reset time */
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
&parsed_time);
if (parse_ret >= 0) {
/*
* If the parser succeeded we need to check the
* status of the parsed time. If the time was
* parsed successfully 'parsed_time' will be
* different than zero, if so, override the time
* holder with the new value, otherwise keep the
* original.
*/
if (flb_time_to_nanosec(&parsed_time) != 0L) {
flb_time_copy(&tm, &parsed_time);
}

if (ctx->reserve_data) {
if (!ctx->preserve_key) {
append_arr_i--;
append_arr_len--;
append_arr[append_arr_i] = NULL;
}
}
else {
continue_parsing = FLB_FALSE;
if (ctx->reserve_data) {
if (!ctx->preserve_key) {
append_arr_i--;
append_arr_len--;
append_arr[append_arr_i] = NULL;
}
break;
}
else {
continue_parsing = FLB_FALSE;
}
break;
}
}
}
}

encoder_result = flb_log_event_encoder_begin_record(&log_encoder);
encoder_result = flb_log_event_encoder_begin_record(&log_encoder);

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_set_timestamp(
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_set_timestamp(
&log_encoder, &tm);
}
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_metadata_from_msgpack_object(
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_metadata_from_msgpack_object(
&log_encoder, log_event.metadata);
}
}

if (out_buf != NULL) {
if (ctx->reserve_data) {
char *new_buf = NULL;
int new_size;
int ret;
ret = flb_msgpack_expand_map(out_buf, out_size,
append_arr, append_arr_len,
&new_buf, &new_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot expand map");

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);

return FLB_FILTER_NOTOUCH;
}
if (out_buf != NULL) {
if (ctx->reserve_data) {
char *new_buf = NULL;
int new_size;
int ret;
ret = flb_msgpack_expand_map(out_buf, out_size,
append_arr, append_arr_len,
&new_buf, &new_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot expand map");

flb_free(out_buf);
out_buf = new_buf;
out_size = new_size;
}
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_raw_msgpack(
&log_encoder, out_buf, out_size);
return FLB_FILTER_NOTOUCH;
}

flb_free(out_buf);
ret = FLB_FILTER_MODIFIED;
}
else {
/* re-use original data*/
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_msgpack_object(
&log_encoder, log_event.body);
}
out_buf = new_buf;
out_size = new_size;
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_commit_record(&log_encoder);
encoder_result = \
flb_log_event_encoder_set_body_from_raw_msgpack(
&log_encoder, out_buf, out_size);
}

flb_free(append_arr);
append_arr = NULL;
flb_free(out_buf);
ret = FLB_FILTER_MODIFIED;
}
else {
continue;
/* re-use original data*/
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_msgpack_object(
&log_encoder, log_event.body);
}
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_commit_record(&log_encoder);
}

flb_free(append_arr);
append_arr = NULL;
}

if (log_encoder.output_length > 0) {
Expand Down
Loading