Skip to content

Commit

Permalink
in_opentelemetry: fix broken tag_from_uri option when using http/1.1 (#…
Browse files Browse the repository at this point in the history
…8881)

Signed-off-by: Martin Kjær Jørgensen <[email protected]>
  • Loading branch information
shaohme authored Jun 13, 2024
1 parent b17730d commit 477f656
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ static int send_response(struct http_conn *conn, int http_status, char *message)

static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand All @@ -143,7 +144,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co
cfl_list_foreach(iterator, &decoded_contexts) {
context = cfl_list_entry(iterator, struct cmt, _head);

result = flb_input_metrics_append(ctx->ins, NULL, 0, context);
result = flb_input_metrics_append(ctx->ins, tag, tag_len, context);

if (result != 0) {
flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result);
Expand All @@ -158,6 +159,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co

static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand All @@ -171,7 +173,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht
request->data.len,
&offset);
if (result == 0) {
result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context);
result = flb_input_trace_append(ctx->ins, tag, tag_len, decoded_context);
ctr_decode_opentelemetry_destroy(decoded_context);
}

Expand All @@ -180,6 +182,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht

static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand Down Expand Up @@ -217,24 +220,25 @@ static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http
flb_free(out_buf);
}

flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size);
flb_input_log_append(ctx->ins, tag, tag_len, mp_sbuf.data, mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);

return 0;
}

static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
int result;

if (ctx->raw_traces) {
result = process_payload_raw_traces(ctx, conn, tag, session, request);
result = process_payload_raw_traces(ctx, conn, tag, tag_len, session, request);
}
else {
result = process_payload_traces_proto(ctx, conn, tag, session, request);
result = process_payload_traces_proto(ctx, conn, tag, tag_len, session, request);
}

return result;
Expand Down Expand Up @@ -1569,6 +1573,7 @@ static int json_payload_to_msgpack(struct flb_opentelemetry *ctx,

static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand Down Expand Up @@ -1604,7 +1609,7 @@ static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn
if (ret == 0) {
ret = flb_input_log_append(ctx->ins,
tag,
flb_sds_len(tag),
tag_len,
encoder->output_buffer,
encoder->output_length);
}
Expand Down Expand Up @@ -1787,6 +1792,7 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
size_t original_data_size;
char *uncompressed_data;
size_t uncompressed_data_size;
size_t tag_len;

if (request->uri.data[0] != '/') {
send_response(conn, 400, "error: invalid request\n");
Expand Down Expand Up @@ -1846,6 +1852,8 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
}
}

tag_len = flb_sds_len(tag);

/* Check if we have a Host header: Hostname ; port */
mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);

Expand Down Expand Up @@ -1895,13 +1903,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
}

if (strcmp(uri, "/v1/metrics") == 0) {
ret = process_payload_metrics(ctx, conn, tag, session, request);
ret = process_payload_metrics(ctx, conn, tag, tag_len, session, request);
}
else if (strcmp(uri, "/v1/traces") == 0) {
ret = process_payload_traces(ctx, conn, tag, session, request);
ret = process_payload_traces(ctx, conn, tag, tag_len, session, request);
}
else if (strcmp(uri, "/v1/logs") == 0) {
ret = process_payload_logs(ctx, conn, tag, session, request);
ret = process_payload_logs(ctx, conn, tag, tag_len, session, request);
}

if (uncompressed_data != NULL) {
Expand Down

0 comments on commit 477f656

Please sign in to comment.