From 0fe73c5698c42de10505cb6aa96db889cef3ce26 Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Tue, 12 Nov 2024 13:14:37 +0100 Subject: [PATCH] opentelemetry: initial addition of the experimental otlp profile signal Signed-off-by: Leonardo Alminana --- plugins/in_opentelemetry/opentelemetry.c | 8 + plugins/in_opentelemetry/opentelemetry.h | 1 + plugins/in_opentelemetry/opentelemetry_prot.c | 157 ++++++++++++++++++ 3 files changed, 166 insertions(+) diff --git a/plugins/in_opentelemetry/opentelemetry.c b/plugins/in_opentelemetry/opentelemetry.c index 5876be34363..4d640520167 100644 --- a/plugins/in_opentelemetry/opentelemetry.c +++ b/plugins/in_opentelemetry/opentelemetry.c @@ -201,6 +201,14 @@ static struct flb_config_map config_map[] = { NULL }, + { + FLB_CONFIG_MAP_BOOL, "profiles_support", "false", + 0, FLB_TRUE, offsetof(struct flb_opentelemetry, profile_support_enabled), + "This is an experimental feature whoses specification is not stable yet, " \ + "feel free to test it but please do not enable this in production " \ + "environments" + }, + { FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, 0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_max_size), diff --git a/plugins/in_opentelemetry/opentelemetry.h b/plugins/in_opentelemetry/opentelemetry.h index 4c78f941086..931dee4ccf4 100644 --- a/plugins/in_opentelemetry/opentelemetry.h +++ b/plugins/in_opentelemetry/opentelemetry.h @@ -38,6 +38,7 @@ struct flb_opentelemetry { int raw_traces; int tag_from_uri; flb_sds_t logs_metadata_key; + int profile_support_enabled; struct flb_input_instance *ins; diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index d87e6b759af..bfab3d02145 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -32,6 +32,8 @@ #include #include +#include +#include #include #include "opentelemetry.h" @@ -2431,6 +2433,146 @@ static int process_payload_logs_ng(struct flb_opentelemetry *ctx, return ret; } +static int process_payload_profiles_ng(struct flb_opentelemetry *ctx, + flb_sds_t tag, + struct flb_http_request *request, + struct flb_http_response *response) +{ + cfl_sds_t text_encoded_profiles_context; + struct cprof *profiles_context; + struct flb_log_event_encoder *encoder; + size_t offset; + int ret; + + encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); + + if (encoder == NULL) { + return -1; + } + + if (request->content_type == NULL) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] content type missing"); + + return -1; + } + else if (strcasecmp(request->content_type, "application/json") == 0) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] unsuported profiles encoding type : %s", + request->content_type); + + return -1; + } + else if (strcasecmp(request->content_type, "application/x-protobuf") == 0) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] unsuported profiles encoding type : %s", + request->content_type); + + return -1; + } + else if (strcasecmp(request->content_type, "application/grpc") == 0) { + if (cfl_sds_len(request->body) < 5) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] malformed grpc packet of size %zu", + cfl_sds_len(request->body)); + + return -1; + } + + profiles_context = NULL; + offset = 0; + + ret = cprof_decode_opentelemetry_create(&profiles_context, + &((uint8_t *) request->body)[5], + (cfl_sds_len(request->body)) - 5, + &offset); + + if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] profile decoding error : %d", + ret); + + return -1; + } + + ret = cprof_encode_text_create(&text_encoded_profiles_context, profiles_context); + + cprof_decode_opentelemetry_destroy(profiles_context); + + if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] profile text encoding error : %d", + ret); + + return -1; + } + + flb_log_event_encoder_begin_record(encoder); + + flb_log_event_encoder_set_current_timestamp(encoder); + + ret = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("Profile"), + FLB_LOG_EVENT_STRING_VALUE(text_encoded_profiles_context, + cfl_sds_len(text_encoded_profiles_context))); + + cprof_encode_text_destroy(text_encoded_profiles_context); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] re encoded profile ingestion error : %d", + ret); + + return -1; + } + + ret = flb_log_event_encoder_commit_record(encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] re encoded profile ingestion error : %d", + ret); + + return -1; + } + + ret = flb_input_log_append(ctx->ins, + tag, + flb_sds_len(tag), + encoder->output_buffer, + encoder->output_length); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + + flb_error("[otel] re encoded profile ingestion error : %d", + ret); + + return -1; + } + + ret = 0; + } + else { + flb_plg_error(ctx->ins, "Unsupported content type %s", request->content_type); + + ret = -1; + } + + + flb_log_event_encoder_destroy(encoder); + + return ret; +} static int send_export_service_response_ng(struct flb_http_response *response, int result, @@ -2478,6 +2620,10 @@ int opentelemetry_prot_handle_ng(struct flb_http_request *request, grpc_request = FLB_TRUE; } + else if (context->profile_support_enabled && + strcmp(request->path, "/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export") == 0) { + grpc_request = FLB_TRUE; + } else { send_response_ng(response, 400, "error: invalid endpoint\n"); return -1; @@ -2531,6 +2677,17 @@ int opentelemetry_prot_handle_ng(struct flb_http_request *request, } result = process_payload_logs_ng(context, tag, request, response); } + else if (context->profile_support_enabled && + strcmp(request->path, "/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export") == 0) { + payload_type = 'P'; + if (context->tag_from_uri == FLB_TRUE) { + tag = flb_sds_create("v1development_profiles"); + } + else { + tag = flb_sds_create(context->ins->tag); + } + result = process_payload_profiles_ng(context, tag, request, response); + } if (grpc_request) { send_export_service_response_ng(response, result, payload_type);