Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opentelemetry: initial addition of the experimental otlp profile signal #9583

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions plugins/in_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ static struct flb_config_map config_map[] = {
NULL
},

{
FLB_CONFIG_MAP_BOOL, "profiles_support", "false",
0, FLB_TRUE, offsetof(struct flb_opentelemetry, profile_support_enabled),
"This is an experimental feature whoses specification is not stable yet, " \
"feel free to test it but please do not enable this in production " \
"environments"
},

{
FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE,
0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_max_size),
Expand Down
1 change: 1 addition & 0 deletions plugins/in_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct flb_opentelemetry {
int raw_traces;
int tag_from_uri;
flb_sds_t logs_metadata_key;
int profile_support_enabled;

struct flb_input_instance *ins;

Expand Down
157 changes: 157 additions & 0 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include <monkey/mk_core.h>
#include <cmetrics/cmt_decode_opentelemetry.h>
#include <cprofiles/cprof_decode_opentelemetry.h>
#include <cprofiles/cprof_encode_text.h>

#include <fluent-otel-proto/fluent-otel.h>
#include "opentelemetry.h"
Expand Down Expand Up @@ -2431,6 +2433,146 @@ static int process_payload_logs_ng(struct flb_opentelemetry *ctx,
return ret;
}

static int process_payload_profiles_ng(struct flb_opentelemetry *ctx,
flb_sds_t tag,
struct flb_http_request *request,
struct flb_http_response *response)
{
cfl_sds_t text_encoded_profiles_context;
struct cprof *profiles_context;
struct flb_log_event_encoder *encoder;
size_t offset;
int ret;

encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2);

if (encoder == NULL) {
return -1;
}

if (request->content_type == NULL) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] content type missing");

return -1;
}
else if (strcasecmp(request->content_type, "application/json") == 0) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] unsuported profiles encoding type : %s",
request->content_type);

return -1;
}
else if (strcasecmp(request->content_type, "application/x-protobuf") == 0) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] unsuported profiles encoding type : %s",
request->content_type);

return -1;
}
else if (strcasecmp(request->content_type, "application/grpc") == 0) {
if (cfl_sds_len(request->body) < 5) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] malformed grpc packet of size %zu",
cfl_sds_len(request->body));

return -1;
}

profiles_context = NULL;
offset = 0;

ret = cprof_decode_opentelemetry_create(&profiles_context,
&((uint8_t *) request->body)[5],
(cfl_sds_len(request->body)) - 5,
&offset);

if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] profile decoding error : %d",
ret);

return -1;
}

ret = cprof_encode_text_create(&text_encoded_profiles_context, profiles_context);

cprof_decode_opentelemetry_destroy(profiles_context);

if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] profile text encoding error : %d",
ret);

return -1;
}

flb_log_event_encoder_begin_record(encoder);

flb_log_event_encoder_set_current_timestamp(encoder);

ret = flb_log_event_encoder_append_body_values(
encoder,
FLB_LOG_EVENT_CSTRING_VALUE("Profile"),
FLB_LOG_EVENT_STRING_VALUE(text_encoded_profiles_context,
cfl_sds_len(text_encoded_profiles_context)));

cprof_encode_text_destroy(text_encoded_profiles_context);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
ret);

return -1;
}

ret = flb_log_event_encoder_commit_record(encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
ret);

return -1;
}

ret = flb_input_log_append(ctx->ins,
tag,
flb_sds_len(tag),
encoder->output_buffer,
encoder->output_length);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
ret);

return -1;
}

ret = 0;
}
else {
flb_plg_error(ctx->ins, "Unsupported content type %s", request->content_type);

ret = -1;
}


flb_log_event_encoder_destroy(encoder);

return ret;
}

static int send_export_service_response_ng(struct flb_http_response *response,
int result,
Expand Down Expand Up @@ -2478,6 +2620,10 @@ int opentelemetry_prot_handle_ng(struct flb_http_request *request,

grpc_request = FLB_TRUE;
}
else if (context->profile_support_enabled &&
strcmp(request->path, "/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export") == 0) {
grpc_request = FLB_TRUE;
}
else {
send_response_ng(response, 400, "error: invalid endpoint\n");
return -1;
Expand Down Expand Up @@ -2531,6 +2677,17 @@ int opentelemetry_prot_handle_ng(struct flb_http_request *request,
}
result = process_payload_logs_ng(context, tag, request, response);
}
else if (context->profile_support_enabled &&
strcmp(request->path, "/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export") == 0) {
payload_type = 'P';
if (context->tag_from_uri == FLB_TRUE) {
tag = flb_sds_create("v1development_profiles");
}
else {
tag = flb_sds_create(context->ins->tag);
}
result = process_payload_profiles_ng(context, tag, request, response);
}

if (grpc_request) {
send_export_service_response_ng(response, result, payload_type);
Expand Down
Loading