Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_opentelemetry: support metadata key properties #8475

Merged
merged 3 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 199 additions & 2 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_ra_key.h>

#include <cfl/cfl.h>
#include <fluent-otel-proto/fluent-otel.h>
Expand Down Expand Up @@ -355,6 +356,15 @@ static void clear_array(Opentelemetry__Proto__Logs__V1__LogRecord **logs,

logs[index]->attributes = NULL;
}
if (logs[index]->severity_text != NULL) {
flb_free(logs[index]->severity_text);
}
if (logs[index]->span_id.data != NULL) {
flb_free(logs[index]->span_id.data);
}
if (logs[index]->trace_id.data != NULL) {
flb_free(logs[index]->trace_id.data);
}
}
}

Expand Down Expand Up @@ -934,6 +944,142 @@ static int flush_to_otel(struct opentelemetry_context *ctx,
return ret;
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
static int is_valid_severity_text(const char *str, size_t str_len)
{
if (str_len == 5) {
if (strncmp("TRACE", str, 5) == 0 ||
strncmp("DEBUG", str, 5) == 0 ||
strncmp("ERROR", str, 5) == 0 ||
strncmp("FATAL", str, 5) == 0) {
return FLB_TRUE;
}
}
else if (str_len == 4) {
if (strncmp("INFO", str, 4) == 0||
strncmp("WARN", str, 4) == 0) {
return FLB_TRUE;
}
}
return FLB_FALSE;
}
/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
static int is_valid_severity_number(uint64_t val)
{
if (val >= 1 && val <= 24) {
return FLB_TRUE;
}
return FLB_FALSE;
}

static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
{
struct flb_ra_value *ra_val;

if (ctx == NULL || event == NULL || log_record == NULL) {
return -1;
}
/* ObservedTimestamp */
if (ctx->ra_observed_timestamp_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_observed_timestamp_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->observed_time_unix_nano = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}

/* Timestamp */
if (ctx->ra_timestamp_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_timestamp_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->time_unix_nano = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
else {
log_record->time_unix_nano = flb_time_to_nanosec(&event->timestamp);
}
}

/* SeverityText */
if (ctx->ra_severity_text_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR &&
is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE) {
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
}
flb_ra_key_value_destroy(ra_val);
}
else {
/* To prevent invalid free */
log_record->severity_text = NULL;
}
}

/* SeverityNumber */
if (ctx->ra_severity_number_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER &&
is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) {
log_record->severity_number = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}

/* TraceFlags */
if (ctx->ra_trace_flags_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->flags = (uint32_t)ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}

/* SpanId */
if (ctx->ra_span_id_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
flb_ra_key_value_destroy(ra_val);
}
}

/* TraceId */
if (ctx->ra_trace_id_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
flb_ra_key_value_destroy(ra_val);
}
}

/* Attributes */
if (ctx->ra_attributes_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_attributes_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_MAP) {
if (log_record->attributes != NULL) {
otlp_kvarray_destroy(log_record->attributes,
log_record->n_attributes);
}
log_record->attributes = msgpack_map_to_otlp_kvarray(&ra_val->o, &log_record->n_attributes);
flb_ra_key_value_destroy(ra_val);
}
}

return 0;
}

static int process_logs(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *ins, void *out_context,
Expand Down Expand Up @@ -1010,9 +1156,10 @@ static int process_logs(struct flb_event_chunk *event_chunk,
break;
}

append_v1_logs_metadata(ctx, &event, &log_records[log_record_count]);

ret = FLB_OK;

/* set timestamp */
log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp);
log_record_count++;

Expand Down Expand Up @@ -1314,7 +1461,6 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set payload compression mechanism. Option available is 'gzip'"
},

/*
* Logs Properties
* ---------------
Expand Down Expand Up @@ -1348,6 +1494,57 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload),
"Specify if the response paylod should be logged or not"
},
{
FLB_CONFIG_MAP_STR, "logs_observed_timestamp_metadata_key", "$ObservedTimestamp",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_observed_timestamp_metadata_key),
"Specify an ObservedTimestamp key"
nokute78 marked this conversation as resolved.
Show resolved Hide resolved
},
{
FLB_CONFIG_MAP_STR, "logs_timestamp_metadata_key", "$Timestamp",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_timestamp_metadata_key),
"Specify a Timestamp key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_text_metadata_key", "$SeverityText",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_text_metadata_key),
"Specify a SeverityText key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_number_metadata_key", "$SeverityNumber",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_number_metadata_key),
"Specify a SeverityNumber key"
},
{
FLB_CONFIG_MAP_STR, "logs_trace_flags_metadata_key", "$TraceFlags",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_trace_flags_metadata_key),
"Specify a TraceFlags key"
},
{
FLB_CONFIG_MAP_STR, "logs_span_id_metadata_key", "$SpanId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_span_id_metadata_key),
"Specify a SpanId key"
},
{
FLB_CONFIG_MAP_STR, "logs_trace_id_metadata_key", "$TraceId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_trace_id_metadata_key),
"Specify a TraceId key"
},
{
FLB_CONFIG_MAP_STR, "logs_attributes_metadata_key", "$Attributes",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_attributes_metadata_key),
"Specify an Attributes key"
},
{
FLB_CONFIG_MAP_STR, "logs_instrumentation_scope_metadata_key", "InstrumentationScope",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_instrumentation_scope_metadata_key),
"Specify an InstrumentationScope key"
},
{
FLB_CONFIG_MAP_STR, "logs_resource_metadata_key", "Resource",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_resource_metadata_key),
"Specify a Resource key"
},

/* EOF */
{0}
};
Expand Down
28 changes: 28 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,34 @@ struct opentelemetry_context {
char *host;
int port;

/* metadata keys */
flb_sds_t logs_observed_timestamp_metadata_key;
struct flb_record_accessor *ra_observed_timestamp_metadata;

flb_sds_t logs_timestamp_metadata_key;
struct flb_record_accessor *ra_timestamp_metadata;

flb_sds_t logs_severity_text_metadata_key;
struct flb_record_accessor *ra_severity_text_metadata;

flb_sds_t logs_severity_number_metadata_key;
struct flb_record_accessor *ra_severity_number_metadata;

flb_sds_t logs_trace_flags_metadata_key;
struct flb_record_accessor *ra_trace_flags_metadata;

flb_sds_t logs_span_id_metadata_key;
struct flb_record_accessor *ra_span_id_metadata;

flb_sds_t logs_trace_id_metadata_key;
struct flb_record_accessor *ra_trace_id_metadata;

flb_sds_t logs_attributes_metadata_key;
struct flb_record_accessor *ra_attributes_metadata;

flb_sds_t logs_instrumentation_scope_metadata_key;
flb_sds_t logs_resource_metadata_key;

/* Number of logs to flush at a time */
int batch_size;

Expand Down
65 changes: 65 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,47 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
}
}

ctx->ra_observed_timestamp_metadata = flb_ra_create((char*)ctx->logs_observed_timestamp_metadata_key,
FLB_FALSE);
if (ctx->ra_observed_timestamp_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for observed timestamp");
}
ctx->ra_timestamp_metadata = flb_ra_create((char*)ctx->logs_timestamp_metadata_key,
FLB_FALSE);
if (ctx->ra_timestamp_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for timestamp");
}
ctx->ra_severity_text_metadata = flb_ra_create((char*)ctx->logs_severity_text_metadata_key,
FLB_FALSE);
if (ctx->ra_severity_text_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for severity text");
}
ctx->ra_severity_number_metadata = flb_ra_create((char*)ctx->logs_severity_number_metadata_key,
FLB_FALSE);
if (ctx->ra_severity_number_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for severity number");
}
ctx->ra_trace_flags_metadata = flb_ra_create((char*)ctx->logs_trace_flags_metadata_key,
FLB_FALSE);
if (ctx->ra_trace_flags_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for trace flags");
}
ctx->ra_span_id_metadata = flb_ra_create((char*)ctx->logs_span_id_metadata_key,
FLB_FALSE);
if (ctx->ra_span_id_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for span id");
}
ctx->ra_trace_id_metadata = flb_ra_create((char*)ctx->logs_trace_id_metadata_key,
FLB_FALSE);
if (ctx->ra_trace_id_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for trace id");
}
ctx->ra_attributes_metadata = flb_ra_create((char*)ctx->logs_attributes_metadata_key,
FLB_FALSE);
if (ctx->ra_attributes_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for attributes");
}

return ctx;
}

Expand All @@ -401,6 +442,30 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
if (ctx->mp_accessor) {
flb_mp_accessor_destroy(ctx->mp_accessor);
}
if (ctx->ra_observed_timestamp_metadata) {
flb_ra_destroy(ctx->ra_observed_timestamp_metadata);
}
if (ctx->ra_timestamp_metadata) {
flb_ra_destroy(ctx->ra_timestamp_metadata);
}
if (ctx->ra_severity_text_metadata) {
flb_ra_destroy(ctx->ra_severity_text_metadata);
}
if (ctx->ra_severity_number_metadata) {
flb_ra_destroy(ctx->ra_severity_number_metadata);
}
if (ctx->ra_trace_flags_metadata) {
flb_ra_destroy(ctx->ra_trace_flags_metadata);
}
if (ctx->ra_span_id_metadata) {
flb_ra_destroy(ctx->ra_span_id_metadata);
}
if (ctx->ra_trace_id_metadata) {
flb_ra_destroy(ctx->ra_trace_id_metadata);
}
if (ctx->ra_attributes_metadata) {
flb_ra_destroy(ctx->ra_attributes_metadata);
}

flb_free(ctx->proxy_host);
flb_free(ctx);
Expand Down
Loading