diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index 4a15ce6e92d..eae371dd316 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -123,6 +123,7 @@ static int send_response(struct http_conn *conn, int http_status, char *message) static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { @@ -143,7 +144,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co cfl_list_foreach(iterator, &decoded_contexts) { context = cfl_list_entry(iterator, struct cmt, _head); - result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + result = flb_input_metrics_append(ctx->ins, tag, tag_len, context); if (result != 0) { flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result); @@ -158,6 +159,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { @@ -171,7 +173,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht request->data.len, &offset); if (result == 0) { - result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context); + result = flb_input_trace_append(ctx->ins, tag, tag_len, decoded_context); ctr_decode_opentelemetry_destroy(decoded_context); } @@ -180,6 +182,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { @@ -217,7 +220,7 @@ static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http flb_free(out_buf); } - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size); + flb_input_log_append(ctx->ins, tag, tag_len, mp_sbuf.data, mp_sbuf.size); msgpack_sbuffer_destroy(&mp_sbuf); return 0; @@ -225,16 +228,17 @@ static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { int result; if (ctx->raw_traces) { - result = process_payload_raw_traces(ctx, conn, tag, session, request); + result = process_payload_raw_traces(ctx, conn, tag, tag_len, session, request); } else { - result = process_payload_traces_proto(ctx, conn, tag, session, request); + result = process_payload_traces_proto(ctx, conn, tag, tag_len, session, request); } return result; @@ -1459,6 +1463,7 @@ static int json_payload_to_msgpack(struct flb_opentelemetry *ctx, static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { @@ -1494,7 +1499,7 @@ static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn if (ret == 0) { ret = flb_input_log_append(ctx->ins, tag, - flb_sds_len(tag), + tag_len, encoder->output_buffer, encoder->output_length); } @@ -1677,6 +1682,7 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c size_t original_data_size; char *uncompressed_data; size_t uncompressed_data_size; + size_t tag_len; if (request->uri.data[0] != '/') { send_response(conn, 400, "error: invalid request\n"); @@ -1736,6 +1742,8 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c } } + tag_len = flb_sds_len(tag); + /* Check if we have a Host header: Hostname ; port */ mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); @@ -1785,13 +1793,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c } if (strcmp(uri, "/v1/metrics") == 0) { - ret = process_payload_metrics(ctx, conn, tag, session, request); + ret = process_payload_metrics(ctx, conn, tag, tag_len, session, request); } else if (strcmp(uri, "/v1/traces") == 0) { - ret = process_payload_traces(ctx, conn, tag, session, request); + ret = process_payload_traces(ctx, conn, tag, tag_len, session, request); } else if (strcmp(uri, "/v1/logs") == 0) { - ret = process_payload_logs(ctx, conn, tag, session, request); + ret = process_payload_logs(ctx, conn, tag, tag_len, session, request); } if (uncompressed_data != NULL) {