From 7d658e76fa33ae1685ffb4d4f17400408e6ea78f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Kj=C3=A6r=20J=C3=B8rgensen?= Date: Mon, 27 May 2024 14:51:16 +0200 Subject: [PATCH] in_opentelemetry: attempt to fix tag_from_uri MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Martin Kjær Jørgensen --- plugins/in_opentelemetry/opentelemetry_prot.c | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index 4a15ce6e92d..73d65a02c71 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -123,6 +123,7 @@ static int send_response(struct http_conn *conn, int http_status, char *message) static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { @@ -143,7 +144,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co cfl_list_foreach(iterator, &decoded_contexts) { context = cfl_list_entry(iterator, struct cmt, _head); - result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + result = flb_input_metrics_append(ctx->ins, tag, tag_len, context); if (result != 0) { flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result); @@ -158,6 +159,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { @@ -171,7 +173,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht request->data.len, &offset); if (result == 0) { - result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context); + result = flb_input_trace_append(ctx->ins, tag, tag_len, decoded_context); ctr_decode_opentelemetry_destroy(decoded_context); } @@ -180,6 +182,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { @@ -217,7 +220,7 @@ static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http flb_free(out_buf); } - flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size); + flb_input_log_append(ctx->ins, tag, tag_len, mp_sbuf.data, mp_sbuf.size); msgpack_sbuffer_destroy(&mp_sbuf); return 0; @@ -225,16 +228,17 @@ static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { int result; if (ctx->raw_traces) { - result = process_payload_raw_traces(ctx, conn, tag, session, request); + result = process_payload_raw_traces(ctx, conn, tag, tag_len, session, request); } else { - result = process_payload_traces_proto(ctx, conn, tag, session, request); + result = process_payload_traces_proto(ctx, conn, tag, tag_len, session, request); } return result; @@ -1459,6 +1463,7 @@ static int json_payload_to_msgpack(struct flb_opentelemetry *ctx, static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, + size_t tag_len, struct mk_http_session *session, struct mk_http_request *request) { @@ -1494,7 +1499,7 @@ static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn if (ret == 0) { ret = flb_input_log_append(ctx->ins, tag, - flb_sds_len(tag), + tag_len, encoder->output_buffer, encoder->output_length); } @@ -1736,6 +1741,8 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c } } + size_t tag_len = flb_sds_len(tag); + /* Check if we have a Host header: Hostname ; port */ mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); @@ -1785,13 +1792,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c } if (strcmp(uri, "/v1/metrics") == 0) { - ret = process_payload_metrics(ctx, conn, tag, session, request); + ret = process_payload_metrics(ctx, conn, tag, tag_len, session, request); } else if (strcmp(uri, "/v1/traces") == 0) { - ret = process_payload_traces(ctx, conn, tag, session, request); + ret = process_payload_traces(ctx, conn, tag, tag_len, session, request); } else if (strcmp(uri, "/v1/logs") == 0) { - ret = process_payload_logs(ctx, conn, tag, session, request); + ret = process_payload_logs(ctx, conn, tag, tag_len, session, request); } if (uncompressed_data != NULL) {