diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index e065e3d61ca..b222ad3620b 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -451,6 +451,48 @@ static size_t receiver_to_unpacker(struct fw_conn *conn, size_t request_size, return recv_len; } +static int append_log(struct flb_input_instance *ins, struct fw_conn *conn, + int event_type, + flb_sds_t out_tag, const void *data, size_t len) +{ + int ret; + size_t off = 0; + struct cmt *cmt; + struct ctrace *ctr; + + if (event_type == FLB_EVENT_TYPE_LOGS) { + flb_input_log_append(conn->in, + out_tag, flb_sds_len(out_tag), + data, len); + + return 0; + } + else if (event_type == FLB_EVENT_TYPE_METRICS) { + ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off); + if (ret != CMT_DECODE_MSGPACK_SUCCESS) { + flb_error("cmt_decode_msgpack_create failed. ret=%d", ret); + return -1; + } + flb_input_metrics_append(conn->in, + out_tag, flb_sds_len(out_tag), + cmt); + } + else if (event_type == FLB_EVENT_TYPE_TRACES) { + off = 0; + ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off); + if (ret == -1) { + return -1; + } + + flb_input_trace_append(ins, + out_tag, flb_sds_len(out_tag), + ctr); + } + + return 0; +} + + int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) { int ret; @@ -458,7 +500,6 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) int event_type; int contain_options = FLB_FALSE; size_t index = 0; - size_t off = 0; size_t chunk_id = -1; size_t metadata_id = -1; const char *stag; @@ -476,8 +517,6 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) msgpack_unpacker *unp; size_t all_used = 0; struct flb_in_fw_config *ctx = conn->ctx; - struct cmt *cmt; - struct ctrace *ctr; /* * [tag, time, record] @@ -759,48 +798,23 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); flb_sds_destroy(out_tag); + flb_free(gz_data); return -1; } event_type = ret; } - if (event_type == FLB_EVENT_TYPE_LOGS) { - /* Append uncompressed data */ - flb_input_log_append(conn->in, - out_tag, flb_sds_len(out_tag), - gz_data, gz_size); - flb_free(gz_data); - } - else if (event_type == FLB_EVENT_TYPE_METRICS) { - ret = cmt_decode_msgpack_create(&cmt, (char *) gz_data, gz_size, &off); - if (ret != CMT_DECODE_MSGPACK_SUCCESS) { - flb_error("cmt_decode_msgpack_create failed. ret=%d", ret); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - flb_input_metrics_append(conn->in, - out_tag, flb_sds_len(out_tag), - cmt); - flb_free(gz_data); - } - else if (event_type == FLB_EVENT_TYPE_TRACES) { - off = 0; - ret = ctr_decode_msgpack_create(&ctr, (char *) gz_data, gz_size, &off); - if (ret == -1) { - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - flb_input_trace_append(ins, - out_tag, flb_sds_len(out_tag), - ctr); - + ret = append_log(ins, conn, + event_type, + out_tag, gz_data, gz_size); + if (ret == -1) { + msgpack_unpacked_destroy(&result); + msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); flb_free(gz_data); + return -1; } + flb_free(gz_data); } else { event_type = FLB_EVENT_TYPE_LOGS; @@ -815,37 +829,14 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) event_type = ret; } - if (event_type == FLB_EVENT_TYPE_LOGS) { - flb_input_log_append(conn->in, - out_tag, flb_sds_len(out_tag), - data, len); - } - else if (event_type == FLB_EVENT_TYPE_METRICS) { - ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off); - if (ret != CMT_DECODE_MSGPACK_SUCCESS) { - flb_error("cmt_decode_msgpack_create failed. ret=%d", ret); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - flb_input_metrics_append(conn->in, - out_tag, flb_sds_len(out_tag), - cmt); - } - else if (event_type == FLB_EVENT_TYPE_TRACES) { - off = 0; - ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off); - if (ret == -1) { - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - flb_input_trace_append(ins, - out_tag, flb_sds_len(out_tag), - ctr); + ret = append_log(ins, conn, + event_type, + out_tag, data, len); + if (ret == -1) { + msgpack_unpacked_destroy(&result); + msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); + return -1; } }