Skip to content

Commit

Permalink
in_opentelemetry: attempt to fix tag_from_uri
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Kjær Jørgensen <[email protected]>
  • Loading branch information
shaohme committed May 28, 2024
1 parent dffcd42 commit 7d658e7
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ static int send_response(struct http_conn *conn, int http_status, char *message)

static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand All @@ -143,7 +144,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co
cfl_list_foreach(iterator, &decoded_contexts) {
context = cfl_list_entry(iterator, struct cmt, _head);

result = flb_input_metrics_append(ctx->ins, NULL, 0, context);
result = flb_input_metrics_append(ctx->ins, tag, tag_len, context);

if (result != 0) {
flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result);
Expand All @@ -158,6 +159,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co

static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand All @@ -171,7 +173,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht
request->data.len,
&offset);
if (result == 0) {
result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context);
result = flb_input_trace_append(ctx->ins, tag, tag_len, decoded_context);
ctr_decode_opentelemetry_destroy(decoded_context);
}

Expand All @@ -180,6 +182,7 @@ static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct ht

static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand Down Expand Up @@ -217,24 +220,25 @@ static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http
flb_free(out_buf);
}

flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size);
flb_input_log_append(ctx->ins, tag, tag_len, mp_sbuf.data, mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);

return 0;
}

static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
int result;

if (ctx->raw_traces) {
result = process_payload_raw_traces(ctx, conn, tag, session, request);
result = process_payload_raw_traces(ctx, conn, tag, tag_len, session, request);
}
else {
result = process_payload_traces_proto(ctx, conn, tag, session, request);
result = process_payload_traces_proto(ctx, conn, tag, tag_len, session, request);
}

return result;
Expand Down Expand Up @@ -1459,6 +1463,7 @@ static int json_payload_to_msgpack(struct flb_opentelemetry *ctx,

static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
size_t tag_len,
struct mk_http_session *session,
struct mk_http_request *request)
{
Expand Down Expand Up @@ -1494,7 +1499,7 @@ static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn
if (ret == 0) {
ret = flb_input_log_append(ctx->ins,
tag,
flb_sds_len(tag),
tag_len,
encoder->output_buffer,
encoder->output_length);
}
Expand Down Expand Up @@ -1736,6 +1741,8 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
}
}

size_t tag_len = flb_sds_len(tag);

/* Check if we have a Host header: Hostname ; port */
mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);

Expand Down Expand Up @@ -1785,13 +1792,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
}

if (strcmp(uri, "/v1/metrics") == 0) {
ret = process_payload_metrics(ctx, conn, tag, session, request);
ret = process_payload_metrics(ctx, conn, tag, tag_len, session, request);
}
else if (strcmp(uri, "/v1/traces") == 0) {
ret = process_payload_traces(ctx, conn, tag, session, request);
ret = process_payload_traces(ctx, conn, tag, tag_len, session, request);
}
else if (strcmp(uri, "/v1/logs") == 0) {
ret = process_payload_logs(ctx, conn, tag, session, request);
ret = process_payload_logs(ctx, conn, tag, tag_len, session, request);
}

if (uncompressed_data != NULL) {
Expand Down

0 comments on commit 7d658e7

Please sign in to comment.