diff --git a/plugins/in_lib/in_lib.c b/plugins/in_lib/in_lib.c index bc72d642f93..466f1afe876 100644 --- a/plugins/in_lib/in_lib.c +++ b/plugins/in_lib/in_lib.c @@ -36,6 +36,8 @@ static int in_lib_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { int ret; + int dec_ret; + int enc_ret; int bytes; int out_size; int capacity; @@ -89,47 +91,74 @@ static int in_lib_collect(struct flb_input_instance *ins, } ctx->buf_len = 0; - ret = flb_log_event_decoder_init(&decoder, pack, out_size); + dec_ret = flb_log_event_decoder_init(&decoder, pack, out_size); + if (dec_ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %s", + flb_log_event_decoder_get_error_description(dec_ret)); + flb_free(pack); + flb_pack_state_reset(&ctx->state); + flb_pack_state_init(&ctx->state); + return -1; + } - while ((ret = flb_log_event_decoder_next( - &decoder, - &record)) == FLB_EVENT_DECODER_SUCCESS) { - if (ret == FLB_EVENT_DECODER_SUCCESS) { - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + while ((dec_ret = flb_log_event_decoder_next( + &decoder, + &record)) == FLB_EVENT_DECODER_SUCCESS) { + enc_ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "flb_log_event_encoder_begin_record error : %s", + flb_log_event_encoder_get_error_description(enc_ret)); + flb_log_event_encoder_rollback_record(&ctx->log_encoder); + continue; } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &record.timestamp); + enc_ret = flb_log_event_encoder_set_timestamp( + &ctx->log_encoder, + &record.timestamp); + if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "flb_log_event_encoder_set_timestamp error : %s", + flb_log_event_encoder_get_error_description(enc_ret)); + flb_log_event_encoder_rollback_record(&ctx->log_encoder); + continue; } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_metadata_from_msgpack_object( - &ctx->log_encoder, - record.metadata); + enc_ret = flb_log_event_encoder_set_metadata_from_msgpack_object( + &ctx->log_encoder, + record.metadata); + if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "flb_log_event_encoder_set_metadata_from_msgpack_object error : %s", + flb_log_event_encoder_get_error_description(enc_ret)); + flb_log_event_encoder_rollback_record(&ctx->log_encoder); + continue; } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - record.body); + enc_ret = flb_log_event_encoder_set_body_from_msgpack_object( + &ctx->log_encoder, + record.body); + if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "flb_log_event_encoder_set_body_from_msgpack_object error : %s", + flb_log_event_encoder_get_error_description(enc_ret)); + flb_log_event_encoder_rollback_record(&ctx->log_encoder); + continue; } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); - } - else { + enc_ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "flb_log_event_encoder_commit_record error : %s", + flb_log_event_encoder_get_error_description(enc_ret)); flb_log_event_encoder_rollback_record(&ctx->log_encoder); + continue; } } - if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA && - decoder.offset == out_size) { - ret = FLB_EVENT_ENCODER_SUCCESS; - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { + dec_ret = flb_log_event_decoder_get_last_result(&decoder); + if (dec_ret == FLB_EVENT_DECODER_SUCCESS) { flb_input_log_append(ctx->ins, NULL, 0, ctx->log_encoder.output_buffer, ctx->log_encoder.output_length); @@ -137,8 +166,9 @@ static int in_lib_collect(struct flb_input_instance *ins, ret = 0; } else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); - + flb_plg_error(ctx->ins, + "flb_log_event_decoder_get_last_result error : %s", + flb_log_event_decoder_get_error_description(dec_ret)); ret = -1; }