Skip to content

Commit

Permalink
filter_parser: refactoring. simplify msgpack loop.
Browse files Browse the repository at this point in the history
From
   if (obj->type == MSGPACK_OBJECT_MAP) {
       /* filtering */
   }
   else {
       continue;
   }

To
   if (obj->type != MSGPACK_OBJECT_MAP) {
       continue;
   }
   /* filtering */

Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed May 13, 2023
1 parent 50494f2 commit bc5282d
Showing 1 changed file with 102 additions and 104 deletions.
206 changes: 102 additions & 104 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,143 +236,141 @@ static int cb_parser_filter(const void *data, size_t bytes,
flb_time_copy(&tm, &log_event.timestamp);
obj = log_event.body;

if (obj->type == MSGPACK_OBJECT_MAP) {
map_num = obj->via.map.size;
if (ctx->reserve_data) {
append_arr_len = obj->via.map.size;
append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *));
if (obj->type != MSGPACK_OBJECT_MAP) {
continue;
}
map_num = obj->via.map.size;
if (ctx->reserve_data) {
append_arr_len = obj->via.map.size;
append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *));

if (append_arr == NULL) {
flb_errno();
if (append_arr == NULL) {
flb_errno();

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);

return FLB_FILTER_NOTOUCH;
}
return FLB_FILTER_NOTOUCH;
}
}

continue_parsing = FLB_TRUE;
for (i = 0; i < map_num && continue_parsing; i++) {
kv = &obj->via.map.ptr[i];
if (ctx->reserve_data) {
append_arr[append_arr_i] = kv;
append_arr_i++;
}
if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) {
/* key is not string */
continue_parsing = FLB_TRUE;
for (i = 0; i < map_num && continue_parsing; i++) {
kv = &obj->via.map.ptr[i];
if (ctx->reserve_data) {
append_arr[append_arr_i] = kv;
append_arr_i++;
}
if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) {
/* key is not string */
continue;
}
if (key_len == ctx->key_name_len &&
!strncmp(key_str, ctx->key_name, key_len)) {
if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) {
/* val is not string */
continue;
}
if (key_len == ctx->key_name_len &&
!strncmp(key_str, ctx->key_name, key_len)) {
if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) {
/* val is not string */
continue;
}

/* Lookup parser */
mk_list_foreach(head, &ctx->parsers) {
fp = mk_list_entry(head, struct filter_parser, _head);

/* Reset time */
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
&parsed_time);
if (parse_ret >= 0) {
/*
* If the parser succeeded we need to check the
* status of the parsed time. If the time was
* parsed successfully 'parsed_time' will be
* different than zero, if so, override the time
* holder with the new value, otherwise keep the
* original.
*/
if (flb_time_to_nanosec(&parsed_time) != 0L) {
flb_time_copy(&tm, &parsed_time);
}
/* Lookup parser */
mk_list_foreach(head, &ctx->parsers) {
fp = mk_list_entry(head, struct filter_parser, _head);

/* Reset time */
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
&parsed_time);
if (parse_ret >= 0) {
/*
* If the parser succeeded we need to check the
* status of the parsed time. If the time was
* parsed successfully 'parsed_time' will be
* different than zero, if so, override the time
* holder with the new value, otherwise keep the
* original.
*/
if (flb_time_to_nanosec(&parsed_time) != 0L) {
flb_time_copy(&tm, &parsed_time);
}

if (ctx->reserve_data) {
if (!ctx->preserve_key) {
append_arr_i--;
append_arr_len--;
append_arr[append_arr_i] = NULL;
}
}
else {
continue_parsing = FLB_FALSE;
if (ctx->reserve_data) {
if (!ctx->preserve_key) {
append_arr_i--;
append_arr_len--;
append_arr[append_arr_i] = NULL;
}
break;
}
else {
continue_parsing = FLB_FALSE;
}
break;
}
}
}
}

encoder_result = flb_log_event_encoder_begin_record(&log_encoder);
encoder_result = flb_log_event_encoder_begin_record(&log_encoder);

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_set_timestamp(
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_set_timestamp(
&log_encoder, &tm);
}
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_metadata_from_msgpack_object(
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_metadata_from_msgpack_object(
&log_encoder, log_event.metadata);
}
}

if (out_buf != NULL) {
if (ctx->reserve_data) {
char *new_buf = NULL;
int new_size;
int ret;
ret = flb_msgpack_expand_map(out_buf, out_size,
append_arr, append_arr_len,
&new_buf, &new_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot expand map");

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);

return FLB_FILTER_NOTOUCH;
}
if (out_buf != NULL) {
if (ctx->reserve_data) {
char *new_buf = NULL;
int new_size;
int ret;
ret = flb_msgpack_expand_map(out_buf, out_size,
append_arr, append_arr_len,
&new_buf, &new_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot expand map");

flb_free(out_buf);
out_buf = new_buf;
out_size = new_size;
}
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_raw_msgpack(
&log_encoder, out_buf, out_size);
return FLB_FILTER_NOTOUCH;
}

flb_free(out_buf);
ret = FLB_FILTER_MODIFIED;
}
else {
/* re-use original data*/
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_msgpack_object(
&log_encoder, log_event.body);
}
out_buf = new_buf;
out_size = new_size;
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_commit_record(&log_encoder);
encoder_result = \
flb_log_event_encoder_set_body_from_raw_msgpack(
&log_encoder, out_buf, out_size);
}

flb_free(append_arr);
append_arr = NULL;
flb_free(out_buf);
ret = FLB_FILTER_MODIFIED;
}
else {
continue;
/* re-use original data*/
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_msgpack_object(
&log_encoder, log_event.body);
}
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = flb_log_event_encoder_commit_record(&log_encoder);
}

flb_free(append_arr);
append_arr = NULL;
}

if (log_encoder.output_length > 0) {
Expand Down

0 comments on commit bc5282d

Please sign in to comment.