Skip to content

Commit

Permalink
in_forward: Extract a function for appending logs
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Oct 18, 2023
1 parent c73d01e commit c9a5b77
Showing 1 changed file with 60 additions and 69 deletions.
129 changes: 60 additions & 69 deletions plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,55 @@ static size_t receiver_to_unpacker(struct fw_conn *conn, size_t request_size,
return recv_len;
}

static int append_log(struct flb_input_instance *ins, struct fw_conn *conn,
int event_type,
flb_sds_t out_tag, const void *data, size_t len)
{
int ret;
size_t off = 0;
struct cmt *cmt;
struct ctrace *ctr;

if (event_type == FLB_EVENT_TYPE_LOGS) {
flb_input_log_append(conn->in,
out_tag, flb_sds_len(out_tag),
data, len);

return 0;
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off);
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
flb_error("cmt_decode_msgpack_create failed. ret=%d", ret);
return -1;
}
flb_input_metrics_append(conn->in,
out_tag, flb_sds_len(out_tag),
cmt);
}
else if (event_type == FLB_EVENT_TYPE_TRACES) {
off = 0;
ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off);
if (ret == -1) {
return -1;
}

flb_input_trace_append(ins,
out_tag, flb_sds_len(out_tag),
ctr);
}

return 0;
}


int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
{
int ret;
int stag_len;
int event_type;
int contain_options = FLB_FALSE;
size_t index = 0;
size_t off = 0;
size_t chunk_id = -1;
size_t metadata_id = -1;
const char *stag;
Expand All @@ -476,8 +517,6 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
msgpack_unpacker *unp;
size_t all_used = 0;
struct flb_in_fw_config *ctx = conn->ctx;
struct cmt *cmt;
struct ctrace *ctr;

/*
* [tag, time, record]
Expand Down Expand Up @@ -759,48 +798,23 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
flb_free(gz_data);
return -1;
}
event_type = ret;
}

if (event_type == FLB_EVENT_TYPE_LOGS) {
/* Append uncompressed data */
flb_input_log_append(conn->in,
out_tag, flb_sds_len(out_tag),
gz_data, gz_size);
flb_free(gz_data);
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
ret = cmt_decode_msgpack_create(&cmt, (char *) gz_data, gz_size, &off);
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
flb_error("cmt_decode_msgpack_create failed. ret=%d", ret);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}
flb_input_metrics_append(conn->in,
out_tag, flb_sds_len(out_tag),
cmt);
flb_free(gz_data);
}
else if (event_type == FLB_EVENT_TYPE_TRACES) {
off = 0;
ret = ctr_decode_msgpack_create(&ctr, (char *) gz_data, gz_size, &off);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

flb_input_trace_append(ins,
out_tag, flb_sds_len(out_tag),
ctr);

ret = append_log(ins, conn,
event_type,
out_tag, gz_data, gz_size);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
flb_free(gz_data);
return -1;
}
flb_free(gz_data);
}
else {
event_type = FLB_EVENT_TYPE_LOGS;
Expand All @@ -815,37 +829,14 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
event_type = ret;
}

if (event_type == FLB_EVENT_TYPE_LOGS) {
flb_input_log_append(conn->in,
out_tag, flb_sds_len(out_tag),
data, len);
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off);
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
flb_error("cmt_decode_msgpack_create failed. ret=%d", ret);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}
flb_input_metrics_append(conn->in,
out_tag, flb_sds_len(out_tag),
cmt);
}
else if (event_type == FLB_EVENT_TYPE_TRACES) {
off = 0;
ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

flb_input_trace_append(ins,
out_tag, flb_sds_len(out_tag),
ctr);
ret = append_log(ins, conn,
event_type,
out_tag, data, len);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}
}

Expand Down

0 comments on commit c9a5b77

Please sign in to comment.