Skip to content

Commit

Permalink
out_opentelemetry: use record accessor to look up value
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Feb 18, 2024
1 parent 81dd95d commit e85d376
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 83 deletions.
158 changes: 75 additions & 83 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_ra_key.h>

#include <cfl/cfl.h>
#include <fluent-otel-proto/fluent-otel.h>
Expand Down Expand Up @@ -801,31 +802,6 @@ static int flush_to_otel(struct opentelemetry_context *ctx,
return res;
}

static msgpack_object *get_msgpack_object_from_map(msgpack_object *obj, flb_sds_t key)
{
msgpack_object *ret = NULL;
msgpack_object key_obj;
int i;

if (obj == NULL || obj->type != MSGPACK_OBJECT_MAP|| flb_sds_len(key) == 0) {
return NULL;
}
for (i=0; i< obj->via.map.size; i++) {
key_obj = obj->via.map.ptr[i].key;
if (key_obj.type != MSGPACK_OBJECT_STR) {
continue;
}
if (flb_sds_len(key) != key_obj.via.str.size) {
continue;
}
if (memcmp(key_obj.via.str.ptr, key, flb_sds_len(key)) == 0) {
ret = &obj->via.map.ptr[i].val;
break;
}
}
return ret;
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
static int is_valid_severity_text(const char *str, size_t str_len)
{
Expand Down Expand Up @@ -858,85 +834,101 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
{
msgpack_object *obj = NULL;
struct flb_ra_value *ra_val;

if (ctx == NULL || event == NULL || log_record == NULL) {
return -1;
}
/* ObservedTimestamp */
obj = get_msgpack_object_from_map(event->metadata,
ctx->logs_observed_timestamp_metadata_key);
if (obj != NULL && obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->observed_time_unix_nano = obj->via.u64;
if (ctx->ra_observed_timestamp_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_observed_timestamp_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->observed_time_unix_nano = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}

/* Timestamp */
obj = get_msgpack_object_from_map(event->metadata,
ctx->logs_timestamp_metadata_key);
if (obj != NULL && obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->time_unix_nano = obj->via.u64;
}
else {
log_record->time_unix_nano = flb_time_to_nanosec(&event->timestamp);
if (ctx->ra_timestamp_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_timestamp_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->time_unix_nano = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
else {
log_record->time_unix_nano = flb_time_to_nanosec(&event->timestamp);
}
}

/* SeverityText */
obj = get_msgpack_object_from_map(event->metadata,
ctx->logs_severity_text_metadata_key);
if (obj != NULL && obj->type == MSGPACK_OBJECT_STR &&
is_valid_severity_text(obj->via.str.ptr, obj->via.str.size) == FLB_TRUE) {
log_record->severity_text = flb_calloc(1, obj->via.str.size+1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, obj->via.str.ptr, obj->via.str.size);
if (ctx->ra_severity_text_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR &&
is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE) {
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
}
flb_ra_key_value_destroy(ra_val);
}
else {
/* To prevent invalid free */
log_record->severity_text = NULL;
}
}
else {
/* To prevent invalid free */
log_record->severity_text = NULL;
}

/* SeverityNumber */
obj = get_msgpack_object_from_map(event->metadata,
ctx->logs_severity_number_metadata_key);
if (obj != NULL && obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER &&
is_valid_severity_number(obj->via.u64) == FLB_TRUE) {
log_record->severity_number = obj->via.u64;
if (ctx->ra_severity_number_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER &&
is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) {
log_record->severity_number = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}

/* TraceFlags */
obj = get_msgpack_object_from_map(event->metadata,
ctx->logs_trace_flags_metadata_key);
if (obj != NULL && obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->flags = (uint32_t)obj->via.u64;
if (ctx->ra_trace_flags_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->flags = (uint32_t)ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}

/* SpanId */
obj = get_msgpack_object_from_map(event->metadata,
ctx->logs_span_id_metadata_key);
if (obj != NULL && obj->type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, obj->via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, obj->via.bin.ptr, obj->via.bin.size);
log_record->span_id.len = obj->via.bin.size;
if (ctx->ra_span_id_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
flb_ra_key_value_destroy(ra_val);
}
}

/* TraceId */
obj = get_msgpack_object_from_map(event->metadata,
ctx->logs_trace_id_metadata_key);
if (obj != NULL && obj->type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, obj->via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, obj->via.bin.ptr, obj->via.bin.size);
log_record->trace_id.len = obj->via.bin.size;
if (ctx->ra_trace_id_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
flb_ra_key_value_destroy(ra_val);
}
}

/* Attributes */
obj = get_msgpack_object_from_map(event->metadata,
ctx->logs_attributes_metadata_key);
if (obj != NULL && obj->type == MSGPACK_OBJECT_MAP) {
log_record->attributes = msgpack_map_to_otlp_kvarray(obj, &log_record->n_attributes);
if (ctx->ra_attributes_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_attributes_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_MAP) {
log_record->attributes = msgpack_map_to_otlp_kvarray(&ra_val->o, &log_record->n_attributes);
flb_ra_key_value_destroy(ra_val);
}
}

return 0;
Expand Down Expand Up @@ -1337,42 +1329,42 @@ static struct flb_config_map config_map[] = {
"Set payload compression mechanism. Option available is 'gzip'"
},
{
FLB_CONFIG_MAP_STR, "logs_observed_timestamp_metadata_key", "ObservedTimestamp",
FLB_CONFIG_MAP_STR, "logs_observed_timestamp_metadata_key", "$ObservedTimestamp",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_observed_timestamp_metadata_key),
"Specify an ObservedTimestamp key"
},
{
FLB_CONFIG_MAP_STR, "logs_timestamp_metadata_key", "Timestamp",
FLB_CONFIG_MAP_STR, "logs_timestamp_metadata_key", "$Timestamp",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_timestamp_metadata_key),
"Specify an Timestamp key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_text_metadata_key", "SeverityText",
FLB_CONFIG_MAP_STR, "logs_severity_text_metadata_key", "$SeverityText",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_text_metadata_key),
"Specify an SeverityText key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_number_metadata_key", "SeverityNumber",
FLB_CONFIG_MAP_STR, "logs_severity_number_metadata_key", "$SeverityNumber",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_number_metadata_key),
"Specify an SeverityNumber key"
},
{
FLB_CONFIG_MAP_STR, "logs_trace_flags_metadata_key", "TraceFlags",
FLB_CONFIG_MAP_STR, "logs_trace_flags_metadata_key", "$TraceFlags",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_trace_flags_metadata_key),
"Specify an TraceFlags key"
},
{
FLB_CONFIG_MAP_STR, "logs_span_id_metadata_key", "SpanId",
FLB_CONFIG_MAP_STR, "logs_span_id_metadata_key", "$SpanId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_span_id_metadata_key),
"Specify an SpanId key"
},
{
FLB_CONFIG_MAP_STR, "logs_trace_id_metadata_key", "TraceId",
FLB_CONFIG_MAP_STR, "logs_trace_id_metadata_key", "$TraceId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_trace_id_metadata_key),
"Specify an TraceId key"
},
{
FLB_CONFIG_MAP_STR, "logs_attributes_metadata_key", "Attributes",
FLB_CONFIG_MAP_STR, "logs_attributes_metadata_key", "$Attributes",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_attributes_metadata_key),
"Specify an Attributes key"
},
Expand Down
17 changes: 17 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define FLB_OUT_OPENTELEMETRY_H

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_record_accessor.h>

#define FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME "Content-Type"
#define FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL "application/x-protobuf"
Expand Down Expand Up @@ -53,13 +54,29 @@ struct opentelemetry_context {

/* metadata keys */
flb_sds_t logs_observed_timestamp_metadata_key;
struct flb_record_accessor *ra_observed_timestamp_metadata;

flb_sds_t logs_timestamp_metadata_key;
struct flb_record_accessor *ra_timestamp_metadata;

flb_sds_t logs_severity_text_metadata_key;
struct flb_record_accessor *ra_severity_text_metadata;

flb_sds_t logs_severity_number_metadata_key;
struct flb_record_accessor *ra_severity_number_metadata;

flb_sds_t logs_trace_flags_metadata_key;
struct flb_record_accessor *ra_trace_flags_metadata;

flb_sds_t logs_span_id_metadata_key;
struct flb_record_accessor *ra_span_id_metadata;

flb_sds_t logs_trace_id_metadata_key;
struct flb_record_accessor *ra_trace_id_metadata;

flb_sds_t logs_attributes_metadata_key;
struct flb_record_accessor *ra_attributes_metadata;

flb_sds_t logs_instrumentation_scope_metadata_key;
flb_sds_t logs_resource_metadata_key;

Expand Down
66 changes: 66 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,47 @@ struct opentelemetry_context *flb_opentelemetry_context_create(
}
}

ctx->ra_observed_timestamp_metadata = flb_ra_create((char*)ctx->logs_observed_timestamp_metadata_key,
FLB_FALSE);
if (ctx->ra_observed_timestamp_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for observed timestamp");
}
ctx->ra_timestamp_metadata = flb_ra_create((char*)ctx->logs_timestamp_metadata_key,
FLB_FALSE);
if (ctx->ra_timestamp_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for timestamp");
}
ctx->ra_severity_text_metadata = flb_ra_create((char*)ctx->logs_severity_text_metadata_key,
FLB_FALSE);
if (ctx->ra_severity_text_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for severity text");
}
ctx->ra_severity_number_metadata = flb_ra_create((char*)ctx->logs_severity_number_metadata_key,
FLB_FALSE);
if (ctx->ra_severity_number_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for severity number");
}
ctx->ra_trace_flags_metadata = flb_ra_create((char*)ctx->logs_trace_flags_metadata_key,
FLB_FALSE);
if (ctx->ra_trace_flags_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for trace flags");
}
ctx->ra_span_id_metadata = flb_ra_create((char*)ctx->logs_span_id_metadata_key,
FLB_FALSE);
if (ctx->ra_span_id_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for span id");
}
ctx->ra_trace_id_metadata = flb_ra_create((char*)ctx->logs_trace_id_metadata_key,
FLB_FALSE);
if (ctx->ra_trace_id_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for trace id");
}
ctx->ra_attributes_metadata = flb_ra_create((char*)ctx->logs_attributes_metadata_key,
FLB_FALSE);
if (ctx->ra_attributes_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for attributes");
}

return ctx;
}

Expand All @@ -257,6 +298,31 @@ void flb_opentelemetry_context_destroy(
flb_upstream_destroy(ctx->u);
}

if (ctx->ra_observed_timestamp_metadata) {
flb_ra_destroy(ctx->ra_observed_timestamp_metadata);
}
if (ctx->ra_timestamp_metadata) {
flb_ra_destroy(ctx->ra_timestamp_metadata);
}
if (ctx->ra_severity_text_metadata) {
flb_ra_destroy(ctx->ra_severity_text_metadata);
}
if (ctx->ra_severity_number_metadata) {
flb_ra_destroy(ctx->ra_severity_number_metadata);
}
if (ctx->ra_trace_flags_metadata) {
flb_ra_destroy(ctx->ra_trace_flags_metadata);
}
if (ctx->ra_span_id_metadata) {
flb_ra_destroy(ctx->ra_span_id_metadata);
}
if (ctx->ra_trace_id_metadata) {
flb_ra_destroy(ctx->ra_trace_id_metadata);
}
if (ctx->ra_attributes_metadata) {
flb_ra_destroy(ctx->ra_attributes_metadata);
}

flb_free(ctx->proxy_host);
flb_free(ctx);
}

0 comments on commit e85d376

Please sign in to comment.