Skip to content

Commit

Permalink
filter_type_converter: fix error handling after flb_log_event_decoder…
Browse files Browse the repository at this point in the history
…_next

Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Dec 15, 2023
1 parent 2612a1a commit 2389f44
Showing 1 changed file with 99 additions and 51 deletions.
150 changes: 99 additions & 51 deletions plugins/filter_type_converter/type_converter.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ static int cb_type_converter_filter(const void *data, size_t bytes,
int map_num;
int is_record_modified = FLB_FALSE;
int ret;
int dec_ret;
int enc_ret;
msgpack_sbuffer tmp_sbuf;
msgpack_packer tmp_pck;
msgpack_object *obj;
Expand All @@ -211,21 +213,23 @@ static int cb_type_converter_filter(const void *data, size_t bytes,
(void) i_ins;
(void) config;

ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
dec_ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
if (dec_ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(f_ins,
"Log event decoder initialization error : %d", ret);
"Log event decoder initialization error : %s",
flb_log_event_decoder_get_error_description(dec_ret));

return FLB_FILTER_NOTOUCH;
}

ret = flb_log_event_encoder_init(&log_encoder,
FLB_LOG_EVENT_FORMAT_DEFAULT);
enc_ret = flb_log_event_encoder_init(&log_encoder,
FLB_LOG_EVENT_FORMAT_DEFAULT);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(f_ins,
"Log event encoder initialization error : %d", ret);
"Log event encoder initialization error : %s",
flb_log_event_encoder_get_error_description(enc_ret));

flb_log_event_decoder_destroy(&log_decoder);

Expand All @@ -237,34 +241,53 @@ static int cb_type_converter_filter(const void *data, size_t bytes,
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);

/* Iterate each item to know map number */
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
while ((dec_ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {

flb_time_copy(&tm, &log_event.timestamp);
obj = log_event.body;

map_num = obj->via.map.size;

ret = flb_log_event_encoder_begin_record(&log_encoder);
enc_ret = flb_log_event_encoder_begin_record(&log_encoder);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_timestamp(&log_encoder, &tm);
if (enc_ret == FLB_EVENT_ENCODER_SUCCESS) {
enc_ret = flb_log_event_encoder_set_timestamp(&log_encoder, &tm);
}

ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
&log_encoder,
log_event.metadata);
enc_ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
&log_encoder,
log_event.metadata);
if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(f_ins,
"flb_log_event_encoder_set_metadata_from_msgpack_object error: %s",
flb_log_event_encoder_get_error_description(enc_ret));
}

/* write original k/v */
for (i = 0;
i < map_num &&
ret == FLB_EVENT_ENCODER_SUCCESS;
enc_ret == FLB_EVENT_ENCODER_SUCCESS;
i++) {
ret = flb_log_event_encoder_append_body_values(
&log_encoder,
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj->via.map.ptr[i].key),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj->via.map.ptr[i].val));
enc_ret = flb_log_event_encoder_append_body_values(
&log_encoder,
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj->via.map.ptr[i].key),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj->via.map.ptr[i].val));
if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(f_ins,
"flb_log_event_encoder_append_body_values error: %s",
flb_log_event_encoder_get_error_description(enc_ret));
break;
}
}

if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_rollback_record(&log_encoder);

/* To decode next record */
enc_ret = FLB_EVENT_ENCODER_SUCCESS;
continue;
}

mk_list_foreach_safe(head, tmp, &ctx->conv_entries) {
Expand All @@ -275,40 +298,59 @@ static int cb_type_converter_filter(const void *data, size_t bytes,
entry = mk_list_entry(head, struct conv_entry, _head);
ret = flb_ra_get_kv_pair(entry->from_ra, *obj, &start_key, &out_key, &out_val);
if (start_key == NULL || out_key == NULL || out_val == NULL) {
ret = FLB_EVENT_ENCODER_SUCCESS;

continue;
}

/* key is found. try to convert. */
ret = flb_log_event_encoder_append_body_string(
&log_encoder,
entry->to_key,
flb_sds_len(entry->to_key));
enc_ret = flb_log_event_encoder_append_body_string(
&log_encoder,
entry->to_key,
flb_sds_len(entry->to_key));
if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(f_ins,
"flb_log_event_encoder_append_body_string error : %s",
flb_log_event_encoder_get_error_description(enc_ret));
continue;
}

ret = flb_typecast_pack(*out_val, entry->rule, &tmp_pck);
if (ret < 0) {
/* failed. try to write original val... */
flb_plg_error(ctx->ins, "failed to convert. key=%s", entry->from_key);

ret = flb_log_event_encoder_append_body_msgpack_object(
&log_encoder,
out_val);
enc_ret = flb_log_event_encoder_append_body_msgpack_object(
&log_encoder,
out_val);
if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(f_ins,
"flb_log_event_encoder_append_body_msgpack_object error : %s",
flb_log_event_encoder_get_error_description(enc_ret));

/* Break to rollback */
break;
}

continue;
}
else {
ret = flb_log_event_encoder_append_body_raw_msgpack(
&log_encoder,
tmp_sbuf.data, tmp_sbuf.size);
enc_ret = flb_log_event_encoder_append_body_raw_msgpack(
&log_encoder,
tmp_sbuf.data, tmp_sbuf.size);
if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(f_ins,
"flb_log_event_encoder_append_body_raw_msgpack error : %s",
flb_log_event_encoder_get_error_description(enc_ret));
/* Break to rollback */
break;
}

msgpack_sbuffer_clear(&tmp_sbuf);
}

is_record_modified = FLB_TRUE;
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (enc_ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_commit_record(&log_encoder);
}
else {
Expand All @@ -323,29 +365,35 @@ static int cb_type_converter_filter(const void *data, size_t bytes,
flb_plg_trace(ctx->ins, "no touch");

ret = FLB_FILTER_NOTOUCH;
goto cb_type_converter_filter_end;
}
else {
if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
log_decoder.offset == bytes) {
ret = FLB_EVENT_ENCODER_SUCCESS;
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
*out_buf = log_encoder.output_buffer;
*out_bytes = log_encoder.output_length;

ret = FLB_FILTER_MODIFIED;
dec_ret = flb_log_event_decoder_get_last_result(&log_decoder);
if (dec_ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins,
"flb_log_event_decoder_get_last_result error : %s",
flb_log_event_decoder_get_error_description(dec_ret));
ret = FLB_FILTER_NOTOUCH;
goto cb_type_converter_filter_end;
}

flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
}
else {
flb_plg_error(ctx->ins,
"Log event encoder error : %d", ret);
if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event encoder error : %s",
flb_log_event_encoder_get_error_description(enc_ret));

ret = FLB_FILTER_NOTOUCH;
}
ret = FLB_FILTER_NOTOUCH;
goto cb_type_converter_filter_end;
}

*out_buf = log_encoder.output_buffer;
*out_bytes = log_encoder.output_length;

ret = FLB_FILTER_MODIFIED;

flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);

cb_type_converter_filter_end:
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);

Expand Down

0 comments on commit 2389f44

Please sign in to comment.