From 285f3ddfbc36ebe8ddd128ffd93a05b49f1dc729 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 30 May 2024 16:38:08 -0600 Subject: [PATCH] out_opentelemetry: enhance logs handling with groups and metadata support The following patch introduce a big change on how the plugin now process the logs with OTLP format, the high-level features are: 1. Full metadata support: - with the new changes in in_opentelemetry, there is no metadata loss 2. Proper encoding of non-OTLP records as OTLP - The plugin is flexible enough to encode any type of log record. At a low level, we re-architect how resource logs, resource, span_logs, scope and logs are handled. All of this has been implemented on top of the new Fluent Bit groups support for logs. Use cases tested: - OTel Collector (gRPC) --> Fluent Bit --> OTel Collector (gRPC) - dummy, tail, forward --> Fluent Bit --> OTel Collector (gRPC) Signed-off-by: Eduardo Silva --- plugins/out_opentelemetry/CMakeLists.txt | 14 +- plugins/out_opentelemetry/opentelemetry.c | 1122 +---------------- plugins/out_opentelemetry/opentelemetry.h | 28 + .../out_opentelemetry/opentelemetry_conf.c | 144 ++- .../out_opentelemetry/opentelemetry_logs.c | 971 ++++++++++++++ .../out_opentelemetry/opentelemetry_utils.c | 507 ++++++++ .../out_opentelemetry/opentelemetry_utils.h | 46 + 7 files changed, 1749 insertions(+), 1083 deletions(-) create mode 100644 plugins/out_opentelemetry/opentelemetry_logs.c create mode 100644 plugins/out_opentelemetry/opentelemetry_utils.c create mode 100644 plugins/out_opentelemetry/opentelemetry_utils.h diff --git a/plugins/out_opentelemetry/CMakeLists.txt b/plugins/out_opentelemetry/CMakeLists.txt index 03c697ab2d5..f3a050c9c9d 100644 --- a/plugins/out_opentelemetry/CMakeLists.txt +++ b/plugins/out_opentelemetry/CMakeLists.txt @@ -1,6 +1,8 @@ -set(src - opentelemetry.c - opentelemetry_conf.c - ) - -FLB_PLUGIN(out_opentelemetry "${src}" "") +set(src + opentelemetry.c + opentelemetry_logs.c + opentelemetry_utils.c + opentelemetry_conf.c + ) + +FLB_PLUGIN(out_opentelemetry "${src}" "") diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index c8db7a2c8f5..b65666e4004 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -18,6 +18,7 @@ */ #include +#include #include #include #include @@ -41,117 +42,17 @@ extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text); #include "opentelemetry.h" #include "opentelemetry_conf.h" +#include "opentelemetry_utils.h" +int otel_process_logs(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config); -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp_any_value(struct msgpack_object *o); - -static inline void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyValue *value); -static inline void otlp_kvarray_destroy(Opentelemetry__Proto__Common__V1__KeyValue **kvarray, size_t entry_count); -static inline void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValue *kvpair); -static inline void otlp_kvlist_destroy(Opentelemetry__Proto__Common__V1__KeyValueList *kvlist); -static inline void otlp_array_destroy(Opentelemetry__Proto__Common__V1__ArrayValue *array); - -static inline void otlp_kvarray_destroy(Opentelemetry__Proto__Common__V1__KeyValue **kvarray, size_t entry_count) -{ - size_t index; - - if (kvarray != NULL) { - for (index = 0 ; index < entry_count ; index++) { - if (kvarray[index] != NULL) { - otlp_kvpair_destroy(kvarray[index]); - kvarray[index] = NULL; - } - } - - flb_free(kvarray); - } -} - -static inline void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValue *kvpair) -{ - if (kvpair == NULL) { - return; - } - - if (kvpair->key != NULL) { - flb_free(kvpair->key); - } - - if (kvpair->value != NULL) { - otlp_any_value_destroy(kvpair->value); - } - - flb_free(kvpair); -} - -static inline void otlp_kvlist_destroy(Opentelemetry__Proto__Common__V1__KeyValueList *kvlist) -{ - size_t index; - - if (kvlist != NULL) { - if (kvlist->values != NULL) { - for (index = 0 ; index < kvlist->n_values ; index++) { - otlp_kvpair_destroy(kvlist->values[index]); - } - - flb_free(kvlist->values); - } - - flb_free(kvlist); - } -} - -static inline void otlp_array_destroy(Opentelemetry__Proto__Common__V1__ArrayValue *array) -{ - size_t index; - - if (array != NULL) { - if (array->values != NULL) { - for (index = 0 ; index < array->n_values ; index++) { - otlp_any_value_destroy(array->values[index]); - } - - flb_free(array->values); - } - - flb_free(array); - } -} - -static inline void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyValue *value) -{ - if (value != NULL) { - if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) { - if (value->string_value != NULL) { - flb_free(value->string_value); - } - } - else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE) { - if (value->array_value != NULL) { - otlp_array_destroy(value->array_value); - } - } - else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) { - if (value->kvlist_value != NULL) { - otlp_kvlist_destroy(value->kvlist_value); - } - } - else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE) { - if (value->bytes_value.data != NULL) { - flb_free(value->bytes_value.data); - } - } - - value->string_value = NULL; - - flb_free(value); - } -} - -static int http_post(struct opentelemetry_context *ctx, - const void *body, size_t body_len, - const char *tag, int tag_len, - const char *uri) +int opentelemetry_http_post(struct opentelemetry_context *ctx, + const void *body, size_t body_len, + const char *tag, int tag_len, + const char *uri) { size_t final_body_len; void *final_body; @@ -334,967 +235,28 @@ static void append_labels(struct opentelemetry_context *ctx, } } -static void clear_array(Opentelemetry__Proto__Logs__V1__LogRecord **logs, - size_t log_count) -{ - size_t index; - - if (logs == NULL){ - return; - } - - for (index = 0 ; index < log_count ; index++) { - if (logs[index]->body != NULL) { - otlp_any_value_destroy(logs[index]->body); - - logs[index]->body = NULL; - } - - if (logs[index]->attributes != NULL) { - otlp_kvarray_destroy(logs[index]->attributes, - logs[index]->n_attributes); - - logs[index]->attributes = NULL; - } - if (logs[index]->severity_text != NULL) { - flb_free(logs[index]->severity_text); - } - if (logs[index]->span_id.data != NULL) { - flb_free(logs[index]->span_id.data); - } - if (logs[index]->trace_id.data != NULL) { - flb_free(logs[index]->trace_id.data); - } - } -} - -static Opentelemetry__Proto__Common__V1__ArrayValue *otlp_array_value_initialize(size_t entry_count) -{ - Opentelemetry__Proto__Common__V1__ArrayValue *value; - - value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__ArrayValue)); - - if (value != NULL) { - opentelemetry__proto__common__v1__array_value__init(value); - - if (entry_count > 0) { - value->values = \ - flb_calloc(entry_count, - sizeof(Opentelemetry__Proto__Common__V1__AnyValue *)); - - if (value->values == NULL) { - flb_free(value); - - value = NULL; - } - else { - value->n_values = entry_count; - } - } - } - - return value; -} - -static Opentelemetry__Proto__Common__V1__KeyValue *otlp_kvpair_value_initialize() -{ - Opentelemetry__Proto__Common__V1__KeyValue *value; - - value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__KeyValue)); - - if (value != NULL) { - opentelemetry__proto__common__v1__key_value__init(value); - } - - return value; -} - -static Opentelemetry__Proto__Common__V1__KeyValueList *otlp_kvlist_value_initialize(size_t entry_count) -{ - Opentelemetry__Proto__Common__V1__KeyValueList *value; - - value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__KeyValueList)); - - if (value != NULL) { - opentelemetry__proto__common__v1__key_value_list__init(value); - - if (entry_count > 0) { - value->values = \ - flb_calloc(entry_count, - sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); - - if (value->values == NULL) { - flb_free(value); - - value = NULL; - } - else { - value->n_values = entry_count; - } - } - } - - return value; -} - -static Opentelemetry__Proto__Common__V1__AnyValue *otlp_any_value_initialize(int data_type, size_t entry_count) -{ - Opentelemetry__Proto__Common__V1__AnyValue *value; - - value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__AnyValue)); - - if (value == NULL) { - return NULL; - } - - opentelemetry__proto__common__v1__any_value__init(value); - - if (data_type == MSGPACK_OBJECT_STR) { - value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE; - } - else if (data_type == MSGPACK_OBJECT_NIL) { - value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE__NOT_SET; - } - else if (data_type == MSGPACK_OBJECT_BOOLEAN) { - value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE; - } - else if (data_type == MSGPACK_OBJECT_POSITIVE_INTEGER || data_type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { - value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE; - } - else if (data_type == MSGPACK_OBJECT_FLOAT32 || data_type == MSGPACK_OBJECT_FLOAT64) { - value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE; - } - else if (data_type == MSGPACK_OBJECT_ARRAY) { - value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE; - value->array_value = otlp_array_value_initialize(entry_count); - - if (value->array_value == NULL) { - flb_free(value); - - value = NULL; - } - } - else if (data_type == MSGPACK_OBJECT_MAP) { - value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE; - - value->kvlist_value = otlp_kvlist_value_initialize(entry_count); - - if (value->kvlist_value == NULL) { - flb_free(value); - - value = NULL; - } - } - else if (data_type == MSGPACK_OBJECT_BIN) { - value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE; - } - else { - flb_free(value); - - value = NULL; - } - - return value; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_boolean_to_otlp_any_value(struct msgpack_object *o) -{ - Opentelemetry__Proto__Common__V1__AnyValue *result; - - result = otlp_any_value_initialize(MSGPACK_OBJECT_BOOLEAN, 0); - - if (result != NULL) { - result->bool_value = o->via.boolean; - } - - return result; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_integer_to_otlp_any_value(struct msgpack_object *o) -{ - Opentelemetry__Proto__Common__V1__AnyValue *result; - - result = otlp_any_value_initialize(o->type, 0); - - if (result != NULL) { - if (o->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { - result->int_value = (int64_t) o->via.u64; - } - else { - result->int_value = o->via.i64; - } - } - - return result; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_float_to_otlp_any_value(struct msgpack_object *o) -{ - Opentelemetry__Proto__Common__V1__AnyValue *result; - - result = otlp_any_value_initialize(o->type, 0); - - if (result != NULL) { - result->double_value = o->via.f64; - } - - return result; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_string_to_otlp_any_value(struct msgpack_object *o) -{ - Opentelemetry__Proto__Common__V1__AnyValue *result; - - result = otlp_any_value_initialize(MSGPACK_OBJECT_STR, 0); - - if (result != NULL) { - result->string_value = flb_strndup(o->via.str.ptr, o->via.str.size); - - if (result->string_value == NULL) { - otlp_any_value_destroy(result); - - result = NULL; - } - } - - return result; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_nil_to_otlp_any_value(struct msgpack_object *o) -{ - Opentelemetry__Proto__Common__V1__AnyValue *result; - - result = otlp_any_value_initialize(MSGPACK_OBJECT_NIL, 0); - - if (result != NULL) { - result->string_value = NULL; - } - - return result; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_bin_to_otlp_any_value(struct msgpack_object *o) -{ - Opentelemetry__Proto__Common__V1__AnyValue *result; - - result = otlp_any_value_initialize(MSGPACK_OBJECT_BIN, 0); - - if (result != NULL) { - result->bytes_value.len = o->via.bin.size; - result->bytes_value.data = flb_malloc(o->via.bin.size); - - if (result->bytes_value.data == NULL) { - otlp_any_value_destroy(result); - - result = NULL; - } - - memcpy(result->bytes_value.data, o->via.bin.ptr, o->via.bin.size); - } - - return result; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_array_to_otlp_any_value(struct msgpack_object *o) -{ - size_t entry_count; - Opentelemetry__Proto__Common__V1__AnyValue *entry_value; - Opentelemetry__Proto__Common__V1__AnyValue *result; - size_t index; - msgpack_object *p; - - entry_count = o->via.array.size; - result = otlp_any_value_initialize(MSGPACK_OBJECT_ARRAY, entry_count); - - p = o->via.array.ptr; - - if (result != NULL) { - index = 0; - - for (index = 0 ; index < entry_count ; index++) { - entry_value = msgpack_object_to_otlp_any_value(&p[index]); - - if (entry_value == NULL) { - otlp_any_value_destroy(result); - - result = NULL; - - break; - } - - result->array_value->values[index] = entry_value; - } - } - - return result; -} - -static inline Opentelemetry__Proto__Common__V1__KeyValue *msgpack_kv_to_otlp_any_value(struct msgpack_object_kv *input_pair) -{ - Opentelemetry__Proto__Common__V1__KeyValue *kv; - - kv = otlp_kvpair_value_initialize(); - if (kv == NULL) { - flb_errno(); - - return NULL; - } - - kv->key = flb_strndup(input_pair->key.via.str.ptr, input_pair->key.via.str.size); - if (kv->key == NULL) { - flb_errno(); - flb_free(kv); - - return NULL; - } - - kv->value = msgpack_object_to_otlp_any_value(&input_pair->val); - if (kv->value == NULL) { - flb_free(kv->key); - flb_free(kv); - - return NULL; - } - - return kv; -} - -static inline Opentelemetry__Proto__Common__V1__KeyValue **msgpack_map_to_otlp_kvarray(struct msgpack_object *o, size_t *entry_count) +static int opentelemetry_format_test(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx, + int event_type, + const char *tag, int tag_len, + const void *data, size_t bytes, + void **out_data, size_t *out_size) { - Opentelemetry__Proto__Common__V1__KeyValue **result; - size_t index; - msgpack_object_kv *kv; - - *entry_count = o->via.map.size; - result = flb_calloc(*entry_count, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); - if (result != NULL) { - for (index = 0; index < *entry_count; index++) { - kv = &o->via.map.ptr[index]; - result[index] = msgpack_kv_to_otlp_any_value(kv); - } - } - else { - *entry_count = 0; - } + if (event_type == FLB_INPUT_LOGS) { - return result; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_map_to_otlp_any_value(struct msgpack_object *o) -{ - size_t entry_count; - Opentelemetry__Proto__Common__V1__AnyValue *result; - Opentelemetry__Proto__Common__V1__KeyValue *keyvalue; - size_t index; - msgpack_object_kv *kv; - - entry_count = o->via.map.size; - result = otlp_any_value_initialize(MSGPACK_OBJECT_MAP, entry_count); - - if (result != NULL) { - - for (index = 0; index < entry_count; index++) { - kv = &o->via.map.ptr[index]; - keyvalue = msgpack_kv_to_otlp_any_value(kv); - result->kvlist_value->values[index] = keyvalue; - } } + else if (event_type == FLB_INPUT_METRICS) { - return result; -} - -static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp_any_value(struct msgpack_object *o) -{ - Opentelemetry__Proto__Common__V1__AnyValue *result; - - result = NULL; - - switch (o->type) { - case MSGPACK_OBJECT_NIL: - result = msgpack_nil_to_otlp_any_value(o); - break; - - case MSGPACK_OBJECT_BOOLEAN: - result = msgpack_boolean_to_otlp_any_value(o); - break; - - case MSGPACK_OBJECT_POSITIVE_INTEGER: - case MSGPACK_OBJECT_NEGATIVE_INTEGER: - result = msgpack_integer_to_otlp_any_value(o); - break; - - case MSGPACK_OBJECT_FLOAT32: - case MSGPACK_OBJECT_FLOAT64: - result = msgpack_float_to_otlp_any_value(o); - break; - - case MSGPACK_OBJECT_STR: - result = msgpack_string_to_otlp_any_value(o); - break; - - case MSGPACK_OBJECT_MAP: - result = msgpack_map_to_otlp_any_value(o); - break; - - case MSGPACK_OBJECT_BIN: - result = msgpack_bin_to_otlp_any_value(o); - break; - - case MSGPACK_OBJECT_ARRAY: - result = msgpack_array_to_otlp_any_value(o); - break; - - default: - break; - } - - /* This function will fail if it receives an object with - * type MSGPACK_OBJECT_EXT - */ - - return result; -} - -static inline int log_record_set_body(struct opentelemetry_context *ctx, - Opentelemetry__Proto__Logs__V1__LogRecord *log_records, struct flb_log_event *event, - struct flb_record_accessor **out_ra_match) -{ - int ret; - struct mk_list *head; - struct opentelemetry_body_key *bk; - msgpack_object *s_key = NULL; - msgpack_object *o_key = NULL; - msgpack_object *o_val = NULL; - Opentelemetry__Proto__Common__V1__AnyValue *log_object = NULL; - - *out_ra_match = NULL; - mk_list_foreach(head, &ctx->log_body_key_list) { - bk = mk_list_entry(head, struct opentelemetry_body_key, _head); - - ret = flb_ra_get_kv_pair(bk->ra, *event->body, &s_key, &o_key, &o_val); - if (ret == 0) { - log_object = msgpack_object_to_otlp_any_value(o_val); - - /* Link the record accessor pattern that matched */ - *out_ra_match = bk->ra; - break; - } - - log_object = NULL; - } - - /* At this point the record accessor patterns found nothing, so we just package the whole record */ - if (!log_object) { - log_object = msgpack_object_to_otlp_any_value(event->body); } + else if (event_type == FLB_INPUT_TRACES) { - if (!log_object) { - flb_plg_error(ctx->ins, "log event conversion failure"); - return -1; } - /* try to find the following keys: message or log, if found */ - log_records->body = log_object; return 0; } -static int log_record_set_attributes(struct opentelemetry_context *ctx, - Opentelemetry__Proto__Logs__V1__LogRecord *log_record, struct flb_log_event *event, - struct flb_record_accessor *ra_match) -{ - int i; - int ret; - int attr_count = 0; - int unpacked = FLB_FALSE; - size_t array_size; - void *out_buf; - size_t offset = 0; - size_t out_size; - msgpack_object_kv *kv; - msgpack_object *metadata; - msgpack_unpacked result; - Opentelemetry__Proto__Common__V1__KeyValue **buf; - - /* Maximum array size is the total number of root keys in metadata and record keys */ - array_size = event->body->via.map.size; - - /* log metadata (metada that comes from original Fluent Bit record ) */ - metadata = event->metadata; - if (metadata) { - array_size += metadata->via.map.size; - } - - /* - * Remove the keys from the record that were added to the log body and create a new output - * buffer. If there are matches, meaning that a new output buffer was created, ret will - * be FLB_TRUE, if no matches exists it returns FLB_FALSE. - */ - if (ctx->logs_body_key_attributes == FLB_TRUE && ctx->mp_accessor && ra_match) { - /* - * if ra_match is not NULL, it means that the log body was populated with a key from the record - * and the variable holds a reference to the record accessor that matched the key. - * - * Since 'likely' the mp_accessor context can have multiple record accessor patterns, - * we need to make sure to remove 'only' the one that was used in the log body, - * the approach we take is to disable all the patterns, enable the single one that - * matched, process and then re-enable all of them. - */ - flb_mp_accessor_set_active(ctx->mp_accessor, FLB_FALSE); - - /* Only active the one that matched */ - flb_mp_accessor_set_active_by_pattern(ctx->mp_accessor, - ra_match->pattern, - FLB_TRUE); - - /* Remove the undesired key */ - ret = flb_mp_accessor_keys_remove(ctx->mp_accessor, event->body, &out_buf, &out_size); - if (ret) { - msgpack_unpacked_init(&result); - msgpack_unpack_next(&result, out_buf, out_size, &offset); - - array_size += result.data.via.map.size; - unpacked = FLB_TRUE; - } - /* Enable all the mp_accessors */ - flb_mp_accessor_set_active(ctx->mp_accessor, FLB_TRUE); - } - - /* allocate an array to hold the converted map entries */ - buf = flb_calloc(array_size, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); - if (!buf) { - flb_errno(); - if (unpacked) { - msgpack_unpacked_destroy(&result); - flb_free(out_buf); - } - return -1; - } - - /* pack log metadata */ - for (i = 0; i < metadata->via.map.size; i++) { - kv = &metadata->via.map.ptr[i]; - buf[i] = msgpack_kv_to_otlp_any_value(kv); - attr_count++; - } - - /* remaining fields that were not added to log body */ - if (ctx->logs_body_key_attributes == FLB_TRUE && unpacked) { - /* iterate the map and reference each elemento as an OTLP value */ - for (i = 0; i < result.data.via.map.size; i++) { - kv = &result.data.via.map.ptr[i]; - buf[attr_count] = msgpack_kv_to_otlp_any_value(kv); - attr_count++; - } - msgpack_unpacked_destroy(&result); - flb_free(out_buf); - } - - log_record->attributes = buf; - log_record->n_attributes = attr_count; - return 0; -} - -static int flush_to_otel(struct opentelemetry_context *ctx, - struct flb_event_chunk *event_chunk, - Opentelemetry__Proto__Logs__V1__LogRecord **logs, - size_t log_count) -{ - int ret; - void *body; - unsigned len; - - Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest export_logs; - Opentelemetry__Proto__Logs__V1__ScopeLogs scope_log; - Opentelemetry__Proto__Logs__V1__ResourceLogs resource_log; - Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_logs[1]; - Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_logs[1]; - - opentelemetry__proto__collector__logs__v1__export_logs_service_request__init(&export_logs); - opentelemetry__proto__logs__v1__resource_logs__init(&resource_log); - opentelemetry__proto__logs__v1__scope_logs__init(&scope_log); - - scope_log.log_records = logs; - scope_log.n_log_records = log_count; - scope_logs[0] = &scope_log; - - resource_log.scope_logs = scope_logs; - resource_log.n_scope_logs = 1; - resource_logs[0] = &resource_log; - - export_logs.resource_logs = resource_logs; - export_logs.n_resource_logs = 1; - - len = opentelemetry__proto__collector__logs__v1__export_logs_service_request__get_packed_size(&export_logs); - body = flb_calloc(len, sizeof(char)); - if (!body) { - flb_errno(); - return FLB_ERROR; - } - - opentelemetry__proto__collector__logs__v1__export_logs_service_request__pack(&export_logs, body); - - /* send post request to opentelemetry with content type application/x-protobuf */ - ret = http_post(ctx, body, len, - event_chunk->tag, - flb_sds_len(event_chunk->tag), - ctx->logs_uri); - - flb_free(body); - - return ret; -} - -/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */ -static int is_valid_severity_text(const char *str, size_t str_len) -{ - if (str_len == 5) { - if (strncmp("TRACE", str, 5) == 0 || - strncmp("DEBUG", str, 5) == 0 || - strncmp("ERROR", str, 5) == 0 || - strncmp("FATAL", str, 5) == 0) { - return FLB_TRUE; - } - } - else if (str_len == 4) { - if (strncmp("INFO", str, 4) == 0|| - strncmp("WARN", str, 4) == 0) { - return FLB_TRUE; - } - } - return FLB_FALSE; -} -/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */ -static int is_valid_severity_number(uint64_t val) -{ - if (val >= 1 && val <= 24) { - return FLB_TRUE; - } - return FLB_FALSE; -} - -static int append_v1_logs_metadata(struct opentelemetry_context *ctx, - struct flb_log_event *event, - Opentelemetry__Proto__Logs__V1__LogRecord *log_record) -{ - struct flb_ra_value *ra_val; - - if (ctx == NULL || event == NULL || log_record == NULL) { - return -1; - } - /* ObservedTimestamp */ - if (ctx->ra_observed_timestamp_metadata) { - ra_val = flb_ra_get_value_object(ctx->ra_observed_timestamp_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { - log_record->observed_time_unix_nano = ra_val->o.via.u64; - flb_ra_key_value_destroy(ra_val); - } - } - - /* Timestamp */ - if (ctx->ra_timestamp_metadata) { - ra_val = flb_ra_get_value_object(ctx->ra_timestamp_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { - log_record->time_unix_nano = ra_val->o.via.u64; - flb_ra_key_value_destroy(ra_val); - } - else { - log_record->time_unix_nano = flb_time_to_nanosec(&event->timestamp); - } - } - - /* SeverityText */ - if (ctx->ra_severity_text_metadata) { - ra_val = flb_ra_get_value_object(ctx->ra_severity_text_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR && - is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE) { - log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1); - if (log_record->severity_text) { - strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size); - } - flb_ra_key_value_destroy(ra_val); - } - else { - /* To prevent invalid free */ - log_record->severity_text = NULL; - } - } - - /* SeverityNumber */ - if (ctx->ra_severity_number_metadata) { - ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && - is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) { - log_record->severity_number = ra_val->o.via.u64; - flb_ra_key_value_destroy(ra_val); - } - } - - /* TraceFlags */ - if (ctx->ra_trace_flags_metadata) { - ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { - log_record->flags = (uint32_t)ra_val->o.via.u64; - flb_ra_key_value_destroy(ra_val); - } - } - - /* SpanId */ - if (ctx->ra_span_id_metadata) { - ra_val = flb_ra_get_value_object(ctx->ra_span_id_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) { - log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); - if (log_record->span_id.data) { - memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); - log_record->span_id.len = ra_val->o.via.bin.size; - } - flb_ra_key_value_destroy(ra_val); - } - } - - /* TraceId */ - if (ctx->ra_trace_id_metadata) { - ra_val = flb_ra_get_value_object(ctx->ra_trace_id_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) { - log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); - if (log_record->trace_id.data) { - memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); - log_record->trace_id.len = ra_val->o.via.bin.size; - } - flb_ra_key_value_destroy(ra_val); - } - } - - /* Attributes */ - if (ctx->ra_attributes_metadata) { - ra_val = flb_ra_get_value_object(ctx->ra_attributes_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_MAP) { - if (log_record->attributes != NULL) { - otlp_kvarray_destroy(log_record->attributes, - log_record->n_attributes); - } - log_record->attributes = msgpack_map_to_otlp_kvarray(&ra_val->o, &log_record->n_attributes); - flb_ra_key_value_destroy(ra_val); - } - } - - return 0; -} - -static int append_v1_logs_message(struct opentelemetry_context *ctx, - struct flb_log_event *event, - Opentelemetry__Proto__Logs__V1__LogRecord *log_record) -{ - struct flb_ra_value *ra_val; - - if (ctx == NULL || event == NULL || log_record == NULL) { - return -1; - } - - /* SeverityText */ - if (ctx->ra_severity_text_message) { - ra_val = flb_ra_get_value_object(ctx->ra_severity_text_message, *event->body); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR) { - if(is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE){ - log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1); - if (log_record->severity_text) { - strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size); - } - flb_ra_key_value_destroy(ra_val); - }else{ - flb_plg_warn(ctx->ins, "Unable to process %s. Invalid Severity Text.\n", ctx->ra_severity_text_message->pattern); - log_record->severity_text = NULL; - } - } - else { - /* To prevent invalid free */ - log_record->severity_text = NULL; - } - } - - /* SeverityNumber */ - if (ctx->ra_severity_number_message) { - ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->body); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && - is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) { - log_record->severity_number = ra_val->o.via.u64; - flb_ra_key_value_destroy(ra_val); - } - }else if(ctx->ra_severity_text_message){ - //TODO get sev number based off sev text - } - - /* SpanId */ - if (ctx->ra_span_id_message) { - ra_val = flb_ra_get_value_object(ctx->ra_span_id_message, *event->body); - if (ra_val != NULL) { - if(ra_val->o.type == MSGPACK_OBJECT_BIN){ - log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); - if (log_record->span_id.data) { - memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); - log_record->span_id.len = ra_val->o.via.bin.size; - } - }else if(ra_val->o.type == MSGPACK_OBJECT_STR){ - log_record->span_id.data = flb_calloc(8, sizeof(uint8_t)); - if (log_record->span_id.data) { - // Convert to a byte array - uint8_t val[8]; - size_t count; - for(count = 0; count < sizeof val/sizeof *val; count++ ){ - sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]); - ra_val->o.via.str.ptr+=2; - } - memcpy(log_record->span_id.data, val, sizeof(val)); - log_record->span_id.len = sizeof(val); - } - }else{ - flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_span_id_message->pattern); - } - flb_ra_key_value_destroy(ra_val); - } - } - - /* TraceId */ - if (ctx->ra_trace_id_message) { - ra_val = flb_ra_get_value_object(ctx->ra_trace_id_message, *event->body); - if (ra_val != NULL) { - if(ra_val->o.type == MSGPACK_OBJECT_BIN){ - log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); - if (log_record->trace_id.data) { - memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); - log_record->trace_id.len = ra_val->o.via.bin.size; - } - }else if(ra_val->o.type == MSGPACK_OBJECT_STR){ - log_record->trace_id.data = flb_calloc(16, sizeof(uint8_t)); - if (log_record->trace_id.data) { - // Convert from hexdec string to a 16 byte array - uint8_t val[16]; - size_t count; - for(count = 0; count < sizeof val/sizeof *val; count++ ){ - sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]); - ra_val->o.via.str.ptr+=2; - } - memcpy(log_record->trace_id.data, val, sizeof(val)); - log_record->trace_id.len = sizeof(val); - } - }else{ - flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_trace_id_message->pattern); - } - flb_ra_key_value_destroy(ra_val); - } - } - - return 0; -} - -static int process_logs(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *ins, void *out_context, - struct flb_config *config) -{ - int ret; - size_t i; - size_t log_record_count; - Opentelemetry__Proto__Logs__V1__LogRecord **log_record_list; - Opentelemetry__Proto__Logs__V1__LogRecord *log_records; - struct flb_log_event_decoder *decoder; - struct flb_log_event event; - struct opentelemetry_context *ctx; - struct flb_record_accessor *ra_match; - - ctx = (struct opentelemetry_context *) out_context; - - log_record_list = (Opentelemetry__Proto__Logs__V1__LogRecord **) - flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *)); - if (!log_record_list) { - flb_errno(); - return -1; - } - - log_records = (Opentelemetry__Proto__Logs__V1__LogRecord *) - flb_calloc(ctx->batch_size, - sizeof(Opentelemetry__Proto__Logs__V1__LogRecord)); - - if (!log_records) { - flb_errno(); - flb_free(log_record_list); - return -2; - } - - for (i = 0 ; i < ctx->batch_size ; i++) { - log_record_list[i] = &log_records[i]; - } - - decoder = flb_log_event_decoder_create((char *) event_chunk->data, event_chunk->size); - if (decoder == NULL) { - flb_plg_error(ctx->ins, "could not initialize record decoder"); - flb_free(log_record_list); - flb_free(log_records); - return -1; - } - - log_record_count = 0; - - ret = FLB_OK; - while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) { - ra_match = NULL; - opentelemetry__proto__logs__v1__log_record__init(&log_records[log_record_count]); - - /* - * Set the record body by using the logic defined in the configuration by - * the 'logs_body_key' properties. - * - * Note that the reference set in `out_body_parent_key` is the parent/root key that holds the content - * that was discovered. We get that reference so we can easily filter it out when composing - * the final list of attributes. - */ - ret = log_record_set_body(ctx, &log_records[log_record_count], &event, &ra_match); - if (ret == -1) { - /* the only possible fail path is a problem with a memory allocation, let's suggest a FLB_RETRY */ - ret = FLB_RETRY; - break; - } - - /* set attributes from metadata and remaining fields from the main record */ - ret = log_record_set_attributes(ctx, &log_records[log_record_count], &event, ra_match); - if (ret == -1) { - /* as before, it can only fail on a memory allocation */ - ret = FLB_RETRY; - break; - } - - append_v1_logs_metadata(ctx, &event, &log_records[log_record_count]); - - append_v1_logs_message(ctx, &event, &log_records[log_record_count]); - - ret = FLB_OK; - - log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp); - log_record_count++; - - if (log_record_count >= ctx->batch_size) { - ret = flush_to_otel(ctx, event_chunk, log_record_list, log_record_count); - clear_array(log_record_list, log_record_count); - log_record_count = 0; - } - } - - flb_log_event_decoder_destroy(decoder); - - if (log_record_count > 0 && ret == FLB_OK) { - ret = flush_to_otel(ctx, - event_chunk, - log_record_list, - log_record_count); - - clear_array(log_record_list, log_record_count); - } - - flb_free(log_record_list); - flb_free(log_records); - - return ret; -} - static int process_metrics(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *ins, void *out_context, @@ -1361,10 +323,10 @@ static int process_metrics(struct flb_event_chunk *event_chunk, flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf)); if (buf && flb_sds_len(buf) > 0) { /* Send HTTP request */ - result = http_post(ctx, buf, flb_sds_len(buf), - event_chunk->tag, - flb_sds_len(event_chunk->tag), - ctx->metrics_uri); + result = opentelemetry_http_post(ctx, buf, flb_sds_len(buf), + event_chunk->tag, + flb_sds_len(event_chunk->tag), + ctx->metrics_uri); /* Debug http_post() result statuses */ if (result == FLB_OK) { @@ -1450,10 +412,10 @@ static int process_traces(struct flb_event_chunk *event_chunk, flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf)); if (buf && flb_sds_len(buf) > 0) { /* Send HTTP request */ - result = http_post(ctx, buf, flb_sds_len(buf), - event_chunk->tag, - flb_sds_len(event_chunk->tag), - ctx->traces_uri); + result = opentelemetry_http_post(ctx, buf, flb_sds_len(buf), + event_chunk->tag, + flb_sds_len(event_chunk->tag), + ctx->traces_uri); /* Debug http_post() result statuses */ if (result == FLB_OK) { @@ -1512,15 +474,16 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk, { int result = FLB_RETRY; - if (event_chunk->type == FLB_INPUT_METRICS){ - result = process_metrics(event_chunk, out_flush, ins, out_context, config); - } - else if (event_chunk->type == FLB_INPUT_LOGS){ - result = process_logs(event_chunk, out_flush, ins, out_context, config); - } - else if (event_chunk->type == FLB_INPUT_TRACES){ - result = process_traces(event_chunk, out_flush, ins, out_context, config); - } + if (event_chunk->type == FLB_INPUT_METRICS){ + result = process_metrics(event_chunk, out_flush, ins, out_context, config); + } + else if (event_chunk->type == FLB_INPUT_LOGS){ + result = otel_process_logs(event_chunk, out_flush, ins, out_context, config); + } + else if (event_chunk->type == FLB_INPUT_TRACES){ + result = process_traces(event_chunk, out_flush, ins, out_context, config); + } + FLB_OUTPUT_RETURN(result); } @@ -1602,6 +565,10 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload), "Specify if the response paylod should be logged or not" }, + { + FLB_CONFIG_MAP_STR, "logs_metadata_key", "otlp", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_metadata_key), + }, { FLB_CONFIG_MAP_STR, "logs_observed_timestamp_metadata_key", "$ObservedTimestamp", 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_observed_timestamp_metadata_key), @@ -1673,6 +640,7 @@ static struct flb_config_map config_map[] = { "Specify a Severity Number key" }, + /* EOF */ {0} }; @@ -1687,4 +655,6 @@ struct flb_output_plugin out_opentelemetry_plugin = { .config_map = config_map, .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES, .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, + + .test_formatter.callback = opentelemetry_format_test, }; diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index d56901376e6..a4e188f3708 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -59,6 +59,9 @@ struct opentelemetry_context { char *host; int port; + /* record metadata parsing */ + flb_sds_t logs_metadata_key; + /* metadata keys */ flb_sds_t logs_observed_timestamp_metadata_key; struct flb_record_accessor *ra_observed_timestamp_metadata; @@ -139,6 +142,31 @@ struct opentelemetry_context { /* Compression mode (gzip) */ int compress_gzip; + + /* FLB/OTLP Record accessor patterns */ + struct flb_record_accessor *ra_meta_schema; + struct flb_record_accessor *ra_meta_resource_id; + struct flb_record_accessor *ra_meta_scope_id; + struct flb_record_accessor *ra_resource_attr; + struct flb_record_accessor *ra_resource_schema_url; + + struct flb_record_accessor *ra_scope_name; + struct flb_record_accessor *ra_scope_version; + struct flb_record_accessor *ra_scope_attr; + + /* log: metadata components coming from OTLP */ + struct flb_record_accessor *ra_log_meta_otlp_observed_ts; + struct flb_record_accessor *ra_log_meta_otlp_timestamp; + struct flb_record_accessor *ra_log_meta_otlp_severity_number; + struct flb_record_accessor *ra_log_meta_otlp_severity_text; + struct flb_record_accessor *ra_log_meta_otlp_attr; + struct flb_record_accessor *ra_log_meta_otlp_trace_id; + struct flb_record_accessor *ra_log_meta_otlp_span_id; + struct flb_record_accessor *ra_log_meta_otlp_trace_flags; }; +int opentelemetry_http_post(struct opentelemetry_context *ctx, + const void *body, size_t body_len, + const char *tag, int tag_len, + const char *uri); #endif diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index 4e6a68fe153..9a8a17df9fc 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -22,6 +22,7 @@ #include #include #include +#include #include "opentelemetry.h" #include "opentelemetry_conf.h" @@ -415,7 +416,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output if (ctx->ra_trace_id_metadata == NULL) { flb_plg_error(ins, "failed to create ra for trace id"); } - ctx->ra_attributes_metadata = flb_ra_create((char*)ctx->logs_attributes_metadata_key, + ctx->ra_attributes_metadata = flb_ra_create((char*) ctx->logs_attributes_metadata_key, FLB_FALSE); if (ctx->ra_attributes_metadata == NULL) { flb_plg_error(ins, "failed to create ra for attributes"); @@ -441,6 +442,90 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output flb_plg_error(ins, "failed to create ra for message severity number"); } + /* record accessor: group metadata */ + ctx->ra_meta_schema = flb_ra_create("$schema", FLB_FALSE); + if (ctx->ra_meta_schema == NULL) { + flb_plg_error(ins, "failed to create record accessor for schema"); + } + + ctx->ra_meta_resource_id = flb_ra_create((char *) "$resource_id", FLB_FALSE); + if (ctx->ra_meta_resource_id == NULL) { + flb_plg_error(ins, "failed to create record accessor for resource_id"); + } + + ctx->ra_meta_scope_id = flb_ra_create((char *) "$scope_id", FLB_FALSE); + if (ctx->ra_meta_scope_id == NULL) { + flb_plg_error(ins, "failed to create record accessor for scope_id"); + } + + /* record accessor: group body */ + ctx->ra_resource_attr = flb_ra_create("$resource['attributes']", FLB_FALSE); + if (ctx->ra_resource_attr == NULL) { + flb_plg_error(ins, "failed to create record accessor for resource attributes"); + } + + ctx->ra_resource_schema_url = flb_ra_create("$schema_url", FLB_FALSE); + if (ctx->ra_resource_schema_url == NULL) { + flb_plg_error(ins, "failed to create record accessor for resource schema url"); + } + + ctx->ra_scope_name = flb_ra_create("$scope['name']", FLB_FALSE); + if (ctx->ra_scope_name == NULL) { + flb_plg_error(ins, "failed to create record accessor for scope name"); + } + + ctx->ra_scope_version = flb_ra_create("$scope['version']", FLB_FALSE); + if (ctx->ra_scope_version == NULL) { + flb_plg_error(ins, "failed to create record accessor for scope version"); + } + + ctx->ra_scope_attr = flb_ra_create("$scope['attributes']", FLB_FALSE); + if (ctx->ra_scope_attr == NULL) { + flb_plg_error(ins, "failed to create record accessor for scope attributes"); + } + + /* log metadata under $otlp (set by in_opentelemetry) */ + + ctx->ra_log_meta_otlp_observed_ts = flb_ra_create("$otlp['observed_timestamp']", FLB_FALSE); + if (ctx->ra_log_meta_otlp_observed_ts == NULL) { + flb_plg_error(ins, "failed to create record accessor for otlp observed timestamp"); + } + + ctx->ra_log_meta_otlp_timestamp = flb_ra_create("$otlp['timestamp']", FLB_FALSE); + if (ctx->ra_log_meta_otlp_timestamp == NULL) { + flb_plg_error(ins, "failed to create record accessor for otlp timestamp"); + } + + ctx->ra_log_meta_otlp_severity_number = flb_ra_create("$otlp['severity_number']", FLB_FALSE); + if (ctx->ra_log_meta_otlp_severity_number == NULL) { + flb_plg_error(ins, "failed to create record accessor for otlp severity number"); + } + + ctx->ra_log_meta_otlp_severity_text = flb_ra_create("$otlp['severity_text']", FLB_FALSE); + if (ctx->ra_log_meta_otlp_severity_text == NULL) { + flb_plg_error(ins, "failed to create record accessor for otlp severity text"); + } + + ctx->ra_log_meta_otlp_attr = flb_ra_create("$otlp['attributes']", FLB_FALSE); + if (ctx->ra_log_meta_otlp_attr == NULL) { + flb_plg_error(ins, "failed to create record accessor for otlp attributes"); + } + + ctx->ra_log_meta_otlp_trace_id = flb_ra_create("$otlp['trace_id']", FLB_FALSE); + if (ctx->ra_log_meta_otlp_trace_id == NULL) { + flb_plg_error(ins, "failed to create record accessor for otlp trace id"); + } + + ctx->ra_log_meta_otlp_span_id = flb_ra_create("$otlp['span_id']", FLB_FALSE); + if (ctx->ra_log_meta_otlp_span_id == NULL) { + flb_plg_error(ins, "failed to create record accessor for otlp span id"); + } + + ctx->ra_log_meta_otlp_trace_flags = flb_ra_create("$otlp['trace_flags']", FLB_FALSE); + if (ctx->ra_log_meta_otlp_trace_flags == NULL) { + flb_plg_error(ins, "failed to create record accessor for otlp trace flags"); + } + return ctx; } @@ -499,6 +584,63 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx) flb_ra_destroy(ctx->ra_severity_number_message); } + if (ctx->ra_meta_schema) { + flb_ra_destroy(ctx->ra_meta_schema); + } + if (ctx->ra_meta_resource_id) { + flb_ra_destroy(ctx->ra_meta_resource_id); + } + if (ctx->ra_meta_scope_id) { + flb_ra_destroy(ctx->ra_meta_scope_id); + } + if (ctx->ra_resource_attr) { + flb_ra_destroy(ctx->ra_resource_attr); + } + if (ctx->ra_resource_schema_url) { + flb_ra_destroy(ctx->ra_resource_schema_url); + } + if (ctx->ra_scope_name) { + flb_ra_destroy(ctx->ra_scope_name); + } + if (ctx->ra_scope_version) { + flb_ra_destroy(ctx->ra_scope_version); + } + if (ctx->ra_scope_attr) { + flb_ra_destroy(ctx->ra_scope_attr); + } + + if (ctx->ra_log_meta_otlp_observed_ts) { + flb_ra_destroy(ctx->ra_log_meta_otlp_observed_ts); + } + + if (ctx->ra_log_meta_otlp_timestamp) { + flb_ra_destroy(ctx->ra_log_meta_otlp_timestamp); + } + + if (ctx->ra_log_meta_otlp_severity_number) { + flb_ra_destroy(ctx->ra_log_meta_otlp_severity_number); + } + + if (ctx->ra_log_meta_otlp_severity_text) { + flb_ra_destroy(ctx->ra_log_meta_otlp_severity_text); + } + + if (ctx->ra_log_meta_otlp_attr) { + flb_ra_destroy(ctx->ra_log_meta_otlp_attr); + } + + if (ctx->ra_log_meta_otlp_trace_id) { + flb_ra_destroy(ctx->ra_log_meta_otlp_trace_id); + } + + if (ctx->ra_log_meta_otlp_span_id) { + flb_ra_destroy(ctx->ra_log_meta_otlp_span_id); + } + + if (ctx->ra_log_meta_otlp_trace_flags) { + flb_ra_destroy(ctx->ra_log_meta_otlp_trace_flags); + } + flb_free(ctx->proxy_host); flb_free(ctx); } diff --git a/plugins/out_opentelemetry/opentelemetry_logs.c b/plugins/out_opentelemetry/opentelemetry_logs.c new file mode 100644 index 00000000000..e7d0ff794ee --- /dev/null +++ b/plugins/out_opentelemetry/opentelemetry_logs.c @@ -0,0 +1,971 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +//#include +#include + +#include "opentelemetry.h" +#include "opentelemetry_conf.h" +#include "opentelemetry_utils.h" + +/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */ +static int is_valid_severity_text(const char *str, size_t str_len) +{ + if (str_len == 5) { + if (strncmp("TRACE", str, 5) == 0 || + strncmp("DEBUG", str, 5) == 0 || + strncmp("ERROR", str, 5) == 0 || + strncmp("FATAL", str, 5) == 0) { + return FLB_TRUE; + } + } + else if (str_len == 4) { + if (strncmp("INFO", str, 4) == 0|| + strncmp("WARN", str, 4) == 0) { + return FLB_TRUE; + } + } + return FLB_FALSE; +} + +/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */ +static int is_valid_severity_number(uint64_t val) +{ + if (val >= 1 && val <= 24) { + return FLB_TRUE; + } + return FLB_FALSE; +} + + +/* + * From a group record, extract it metadata and validate if it has a valid OTLP schema and check that + * resource_id is set. On success it returns the resource_id, otherwise it returns -1. + */ +static int get_otlp_group_metadata(struct opentelemetry_context *ctx, struct flb_log_event *event, + int64_t *resource_id, int64_t *scope_id) +{ + struct flb_ra_value *ra_val; + + /* + * $schema == 'otlp' + */ + ra_val = flb_ra_get_value_object(ctx->ra_meta_schema, *event->metadata); + if (ra_val == NULL) { + return -1; + } + + if (ra_val->o.type != MSGPACK_OBJECT_STR) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + + if (ra_val->o.via.str.size != 4) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + + if (strncmp(ra_val->o.via.str.ptr, "otlp", ra_val->o.via.str.size) != 0) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + flb_ra_key_value_destroy(ra_val); + + + /* + * $resource_id + */ + ra_val = flb_ra_get_value_object(ctx->ra_meta_resource_id, *event->metadata); + if (ra_val == NULL) { + return -1; + } + + if (ra_val->o.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + *resource_id = ra_val->o.via.i64; + flb_ra_key_value_destroy(ra_val); + + /* + * $scope_id + */ + ra_val = flb_ra_get_value_object(ctx->ra_meta_scope_id, *event->metadata); + if (ra_val == NULL) { + return -1; + } + if (ra_val->o.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + *scope_id = ra_val->o.via.i64; + + flb_ra_key_value_destroy(ra_val); + return 0; +} + +static inline int log_record_set_body(struct opentelemetry_context *ctx, + Opentelemetry__Proto__Logs__V1__LogRecord *log_records, struct flb_log_event *event, + struct flb_record_accessor **out_ra_match) +{ + int ret; + struct mk_list *head; + struct opentelemetry_body_key *bk; + msgpack_object *s_key = NULL; + msgpack_object *o_key = NULL; + msgpack_object *o_val = NULL; + Opentelemetry__Proto__Common__V1__AnyValue *log_object = NULL; + + *out_ra_match = NULL; + mk_list_foreach(head, &ctx->log_body_key_list) { + bk = mk_list_entry(head, struct opentelemetry_body_key, _head); + + ret = flb_ra_get_kv_pair(bk->ra, *event->body, &s_key, &o_key, &o_val); + if (ret == 0) { + log_object = msgpack_object_to_otlp_any_value(o_val); + + /* Link the record accessor pattern that matched */ + *out_ra_match = bk->ra; + break; + } + + log_object = NULL; + } + + /* At this point the record accessor patterns found nothing, so we just package the whole record */ + if (!log_object) { + log_object = msgpack_object_to_otlp_any_value(event->body); + } + + if (!log_object) { + flb_plg_error(ctx->ins, "log event conversion failure"); + return -1; + } + + /* try to find the following keys: message or log, if found */ + log_records->body = log_object; + return 0; +} + +static int log_record_set_attributes(struct opentelemetry_context *ctx, + Opentelemetry__Proto__Logs__V1__LogRecord *log_record, struct flb_log_event *event, + struct flb_record_accessor *ra_match) +{ + int i; + int ret; + int attr_count = 0; + int unpacked = FLB_FALSE; + size_t array_size; + void *out_buf; + size_t offset = 0; + size_t out_size; + msgpack_object_kv *kv; + msgpack_object *metadata; + msgpack_unpacked result; + Opentelemetry__Proto__Common__V1__KeyValue **buf; + + /* Maximum array size is the total number of root keys in metadata and record keys */ + array_size = event->body->via.map.size; + + /* log metadata (metadata that comes from original Fluent Bit record) */ + metadata = event->metadata; + if (metadata) { + array_size += metadata->via.map.size; + } + + /* + * Remove the keys from the record that were added to the log body and create a new output + * buffer. If there are matches, meaning that a new output buffer was created, ret will + * be FLB_TRUE, if no matches exists it returns FLB_FALSE. + */ + if (ctx->logs_body_key_attributes == FLB_TRUE && ctx->mp_accessor && ra_match) { + /* + * if ra_match is not NULL, it means that the log body was populated with a key from the record + * and the variable holds a reference to the record accessor that matched the key. + * + * Since 'likely' the mp_accessor context can have multiple record accessor patterns, + * we need to make sure to remove 'only' the one that was used in the log body, + * the approach we take is to disable all the patterns, enable the single one that + * matched, process and then re-enable all of them. + */ + flb_mp_accessor_set_active(ctx->mp_accessor, FLB_FALSE); + + /* Only active the one that matched */ + flb_mp_accessor_set_active_by_pattern(ctx->mp_accessor, + ra_match->pattern, + FLB_TRUE); + + /* Remove the undesired key */ + ret = flb_mp_accessor_keys_remove(ctx->mp_accessor, event->body, &out_buf, &out_size); + if (ret) { + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, out_buf, out_size, &offset); + + array_size += result.data.via.map.size; + unpacked = FLB_TRUE; + } + /* Enable all the mp_accessors */ + flb_mp_accessor_set_active(ctx->mp_accessor, FLB_TRUE); + } + + /* allocate an array to hold the converted map entries */ + buf = flb_calloc(array_size, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); + if (!buf) { + flb_errno(); + if (unpacked) { + msgpack_unpacked_destroy(&result); + flb_free(out_buf); + + } + return -1; + } + + /* pack log metadata */ + for (i = 0; i < metadata->via.map.size; i++) { + kv = &metadata->via.map.ptr[i]; + + if (kv->key.type != MSGPACK_OBJECT_STR) { + continue; + } + + /* skip internal otlp metadata */ + if (kv->key.via.str.size == 4 && strncmp(kv->key.via.str.ptr, "otlp", 4) == 0) { + continue; + } + + buf[attr_count] = msgpack_kv_to_otlp_any_value(kv); + attr_count++; + } + + /* remaining fields that were not added to log body */ + if (ctx->logs_body_key_attributes == FLB_TRUE && unpacked) { + /* iterate the map and reference each elemento as an OTLP value */ + for (i = 0; i < result.data.via.map.size; i++) { + kv = &result.data.via.map.ptr[i]; + buf[attr_count] = msgpack_kv_to_otlp_any_value(kv); + attr_count++; + } + msgpack_unpacked_destroy(&result); + flb_free(out_buf); + } + + log_record->attributes = buf; + log_record->n_attributes = attr_count; + return 0; +} + +static int append_v1_logs_metadata(struct opentelemetry_context *ctx, + struct flb_log_event *event, + Opentelemetry__Proto__Logs__V1__LogRecord *log_record) +{ + struct flb_ra_value *ra_val; + + if (ctx == NULL || event == NULL || log_record == NULL) { + return -1; + } + + /* ObservedTimestamp */ + ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_observed_ts, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + log_record->observed_time_unix_nano = ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + else if (ctx->ra_observed_timestamp_metadata) { + ra_val = flb_ra_get_value_object(ctx->ra_observed_timestamp_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + log_record->observed_time_unix_nano = ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + } + + /* Timestamp */ + ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_timestamp, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + log_record->time_unix_nano = ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + else if (ctx->ra_timestamp_metadata) { + ra_val = flb_ra_get_value_object(ctx->ra_timestamp_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + log_record->time_unix_nano = ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + else { + log_record->time_unix_nano = flb_time_to_nanosec(&event->timestamp); + } + } + + /* SeverityNumber */ + ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_severity_number, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && + is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) { + log_record->severity_number = ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + else if (ctx->ra_severity_number_metadata) { + ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && + is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) { + log_record->severity_number = ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + } + + /* SeverityText */ + ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_severity_text, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR && + is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE) { + log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1); + if (log_record->severity_text) { + strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size); + } + flb_ra_key_value_destroy(ra_val); + + } + else if (ctx->ra_severity_text_metadata) { + ra_val = flb_ra_get_value_object(ctx->ra_severity_text_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR && + is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE) { + log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1); + if (log_record->severity_text) { + strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size); + } + flb_ra_key_value_destroy(ra_val); + } + } + else { + /* To prevent invalid free */ + log_record->severity_text = NULL; + } + + /* Attributes */ + ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_attr, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_MAP) { + if (log_record->attributes != NULL) { + otlp_kvarray_destroy(log_record->attributes, + log_record->n_attributes); + } + log_record->attributes = msgpack_map_to_otlp_kvarray(&ra_val->o, &log_record->n_attributes); + flb_ra_key_value_destroy(ra_val); + } + else if (ctx->ra_attributes_metadata) { + ra_val = flb_ra_get_value_object(ctx->ra_attributes_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_MAP) { + if (log_record->attributes != NULL) { + otlp_kvarray_destroy(log_record->attributes, + log_record->n_attributes); + } + log_record->attributes = msgpack_map_to_otlp_kvarray(&ra_val->o, &log_record->n_attributes); + flb_ra_key_value_destroy(ra_val); + } + } + + /* TraceId */ + ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_trace_id, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) { + log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); + if (log_record->trace_id.data) { + memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); + log_record->trace_id.len = ra_val->o.via.bin.size; + } + flb_ra_key_value_destroy(ra_val); + } + else if (ctx->ra_trace_id_metadata) { + ra_val = flb_ra_get_value_object(ctx->ra_trace_id_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) { + log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); + if (log_record->trace_id.data) { + memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); + log_record->trace_id.len = ra_val->o.via.bin.size; + } + flb_ra_key_value_destroy(ra_val); + } + } + + /* SpanId */ + ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_span_id, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) { + log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); + if (log_record->span_id.data) { + memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); + log_record->span_id.len = ra_val->o.via.bin.size; + } + flb_ra_key_value_destroy(ra_val); + } + else if (ctx->ra_span_id_metadata) { + ra_val = flb_ra_get_value_object(ctx->ra_span_id_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) { + log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); + if (log_record->span_id.data) { + memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); + log_record->span_id.len = ra_val->o.via.bin.size; + } + flb_ra_key_value_destroy(ra_val); + } + } + + /* TraceFlags */ + ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + log_record->flags = (uint32_t)ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + else if (ctx->ra_trace_flags_metadata) { + ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + log_record->flags = (uint32_t)ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + } + + return 0; +} + +static void free_log_records(Opentelemetry__Proto__Logs__V1__LogRecord **logs, size_t log_count) +{ + size_t index; + Opentelemetry__Proto__Logs__V1__LogRecord *log; + + if (logs == NULL){ + return; + } + + for (index = 0 ; index < log_count ; index++) { + log = logs[index]; + + if (log->body != NULL) { + otlp_any_value_destroy(log->body); + log->body = NULL; + } + + if (log->attributes != NULL) { + otlp_kvarray_destroy(log->attributes, log->n_attributes); + log->attributes = NULL; + } + if (log->severity_text != NULL && log->severity_text != protobuf_c_empty_string) { + flb_free(log->severity_text); + } + if (log->span_id.data != NULL) { + flb_free(log->span_id.data); + } + if (log->trace_id.data != NULL) { + flb_free(log->trace_id.data); + } + + flb_free(log); + } +} + +static void free_resource_logs(Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs, size_t resource_count) +{ + int i; + int scope_id; + Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log; + Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log; + + if (resource_logs == NULL) { + return; + } + + for (i = 0 ; i < resource_count ; i++) { + resource_log = resource_logs[i]; + + if (resource_log->schema_url != NULL && resource_log->schema_url != protobuf_c_empty_string) { + flb_sds_destroy(resource_log->schema_url); + } + + if (resource_log->resource->attributes != NULL) { + otlp_kvarray_destroy(resource_log->resource->attributes, resource_log->resource->n_attributes); + flb_free(resource_log->resource); + } + + /* iterate scoipe logs */ + if (resource_log->n_scope_logs > 0) { + for (scope_id = 0; scope_id < resource_log->n_scope_logs; scope_id++) { + scope_log = resource_log->scope_logs[scope_id]; + + if (scope_log->scope) { + if (scope_log->scope->name != NULL && scope_log->scope->name != protobuf_c_empty_string) { + flb_sds_destroy(scope_log->scope->name); + } + + if (scope_log->scope->version != NULL && scope_log->scope->version != protobuf_c_empty_string) { + flb_sds_destroy(scope_log->scope->version); + } + + if (scope_log->scope->attributes != NULL) { + otlp_kvarray_destroy(scope_log->scope->attributes, scope_log->scope->n_attributes); + } + + flb_free(scope_log->scope); + } + + if (scope_log->log_records != NULL) { + free_log_records(scope_log->log_records, scope_log->n_log_records); + } + + flb_free(scope_log->log_records); + flb_free(scope_log); + } + flb_free(resource_log->scope_logs); + } + + flb_free(resource_log); + } + + flb_free(resource_logs); +} + +static int logs_flush_to_otel(struct opentelemetry_context *ctx, struct flb_event_chunk *event_chunk, + Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *export_logs) +{ + int ret; + void *body; + unsigned len; + + len = opentelemetry__proto__collector__logs__v1__export_logs_service_request__get_packed_size(export_logs); + if (len == 0) { + return FLB_ERROR; + } + + body = flb_calloc(len, sizeof(char)); + if (!body) { + flb_errno(); + return FLB_ERROR; + } + + opentelemetry__proto__collector__logs__v1__export_logs_service_request__pack(export_logs, body); + + /* send post request to opentelemetry with content type application/x-protobuf */ + ret = opentelemetry_http_post(ctx, body, len, + event_chunk->tag, + flb_sds_len(event_chunk->tag), + ctx->logs_uri); + flb_free(body); + + return ret; +} + +static int set_resource_attributes(struct flb_record_accessor *ra, + msgpack_object *map, + Opentelemetry__Proto__Resource__V1__Resource *resource) +{ + struct flb_ra_value *ra_val; + + ra_val = flb_ra_get_value_object(ra, *map); + if (ra_val == NULL) { + return -1; + } + + if (ra_val->o.type != MSGPACK_OBJECT_MAP) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + + resource->attributes = msgpack_map_to_otlp_kvarray(&ra_val->o, + &resource->n_attributes); + flb_ra_key_value_destroy(ra_val); + + if (!resource->attributes) { + return -1; + } + + return 0; +} + +static int set_resource_schema_url(struct flb_record_accessor *ra, + msgpack_object *map, + Opentelemetry__Proto__Logs__V1__ResourceLogs *resource) +{ + + struct flb_ra_value *ra_val; + + ra_val = flb_ra_get_value_object(ra, *map); + if (ra_val == NULL) { + return -1; + } + + if (ra_val->o.type != MSGPACK_OBJECT_STR) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + + resource->schema_url = flb_sds_create_len(ra_val->o.via.str.ptr, + ra_val->o.via.str.size); + flb_ra_key_value_destroy(ra_val); + + if (!resource->schema_url) { + return -1; + } + + return 0; +} + +static int set_scope_name(struct flb_record_accessor *ra, + msgpack_object *map, + Opentelemetry__Proto__Common__V1__InstrumentationScope *scope) +{ + struct flb_ra_value *ra_val; + + ra_val = flb_ra_get_value_object(ra, *map); + if (ra_val == NULL) { + return -1; + } + + if (ra_val->o.type != MSGPACK_OBJECT_STR) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + + scope->name = flb_sds_create_len(ra_val->o.via.str.ptr, ra_val->o.via.str.size); + flb_ra_key_value_destroy(ra_val); + if (!scope->name) { + return -1; + } + + return 0; +} + +static int set_scope_version(struct flb_record_accessor *ra, + msgpack_object *map, + Opentelemetry__Proto__Common__V1__InstrumentationScope *scope) +{ + struct flb_ra_value *ra_val; + + ra_val = flb_ra_get_value_object(ra, *map); + if (ra_val == NULL) { + return -1; + } + + if (ra_val->o.type != MSGPACK_OBJECT_STR) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + + scope->version = flb_sds_create_len(ra_val->o.via.str.ptr, ra_val->o.via.str.size); + flb_ra_key_value_destroy(ra_val); + if (!scope->version) { + return -1; + } + + return 0; +} + +static int set_scope_attributes(struct flb_record_accessor *ra, + msgpack_object *map, + Opentelemetry__Proto__Common__V1__InstrumentationScope *scope) +{ + struct flb_ra_value *ra_val; + + ra_val = flb_ra_get_value_object(ra, *map); + if (ra_val == NULL) { + return -1; + } + + if (ra_val->o.type != MSGPACK_OBJECT_MAP) { + flb_ra_key_value_destroy(ra_val); + return -1; + } + + scope->attributes = msgpack_map_to_otlp_kvarray(&ra_val->o, + &scope->n_attributes); + flb_ra_key_value_destroy(ra_val); + + if (!scope->attributes) { + return -1; + } + + return 0; +} + +int otel_process_logs(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config) +{ + int ret; + int record_type; + int log_record_count; + int max_scopes; + int max_resources; + int64_t resource_id = -1; + int64_t scope_id = -1; + int64_t tmp_resource_id = -1; + int64_t tmp_scope_id = -1; + struct flb_log_event_decoder *decoder; + struct flb_log_event event; + struct opentelemetry_context *ctx; + struct flb_record_accessor *ra_match; + Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest export_logs; + Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs = NULL; + Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log = NULL; + Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs = NULL; + Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log = NULL; + Opentelemetry__Proto__Logs__V1__LogRecord **log_records = NULL; + Opentelemetry__Proto__Logs__V1__LogRecord *log_record = NULL; + + ctx = (struct opentelemetry_context *) out_context; + + decoder = flb_log_event_decoder_create((char *) event_chunk->data, event_chunk->size); + if (decoder == NULL) { + flb_plg_error(ctx->ins, "could not initialize record decoder"); + return -1; + } + + log_record_count = 0; + opentelemetry__proto__collector__logs__v1__export_logs_service_request__init(&export_logs); + + /* local limits */ + max_resources = 100; /* maximim number of resources */ + max_scopes = 100; /* maximum number of scopes per resource */ + + /* allocate for 100 resource logs */ + resource_logs = flb_calloc(max_resources, sizeof(Opentelemetry__Proto__Logs__V1__ResourceLogs *)); + if (!resource_logs) { + flb_errno(); + flb_log_event_decoder_destroy(decoder); + return -1; + } + export_logs.resource_logs = resource_logs; + export_logs.n_resource_logs = 0; + + ret = FLB_OK; + while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) { + /* Check if the record is special (group) or a normal one */ + ret = flb_log_event_decoder_get_record_type(&event, &record_type); + if (ret != 0) { + flb_plg_error(ctx->ins, "record has invalid event type"); + continue; + } + + /* + * Group start: handle resource an scope + * ------------------------------------- + */ + if (record_type == FLB_LOG_EVENT_GROUP_START) { + /* Look for OTLP info */ + tmp_resource_id = -1; + tmp_scope_id = -1; + + ret = get_otlp_group_metadata(ctx, &event, &tmp_resource_id, &tmp_scope_id); + if (ret == -1) { + /* skip unknown group info */ + continue; + } + + /* if we have a new resource_id, start a new resource context */ + if (resource_id != tmp_resource_id) { + + if (export_logs.n_resource_logs >= max_resources) { + flb_plg_error(ctx->ins, "max resources limit reached"); + ret = FLB_ERROR; + break; + } + +start_resource: + /* + * On every group start, check if we are following the previous resource_id or not, so we can pack scopes + * under the right resource. + */ + resource_log = flb_calloc(1, sizeof(Opentelemetry__Proto__Logs__V1__ResourceLogs)); + if (!resource_log) { + flb_errno(); + ret = FLB_RETRY; + break; + } + opentelemetry__proto__logs__v1__resource_logs__init(resource_log); + + /* add the resource log */ + resource_logs[export_logs.n_resource_logs] = resource_log; + export_logs.n_resource_logs++; + + resource_log->resource = flb_calloc(1, sizeof(Opentelemetry__Proto__Resource__V1__Resource)); + if (!resource_log->resource) { + flb_errno(); + flb_free(resource_log); + ret = FLB_RETRY; + break; + } + opentelemetry__proto__resource__v1__resource__init(resource_log->resource); + + /* group body: $resource['attributes'] */ + set_resource_attributes(ctx->ra_resource_attr, event.body, resource_log->resource); + + /* group body: $schema_url */ + set_resource_schema_url(ctx->ra_resource_schema_url, event.body, resource_log); + + /* prepare the scopes */ + scope_logs = flb_calloc(100, sizeof(Opentelemetry__Proto__Logs__V1__ScopeLogs *)); + if (!scope_logs) { + flb_errno(); + ret = FLB_RETRY; + break; + } + + resource_log->scope_logs = scope_logs; + resource_log->n_scope_logs = 0; + + /* update the current resource_id */ + resource_id = tmp_resource_id; + } + + if (scope_id != tmp_scope_id) { + /* check limits */ + if (resource_log->n_scope_logs >= max_scopes) { + flb_plg_error(ctx->ins, "max scopes limit reached"); + ret = FLB_ERROR; + break; + } + + /* process the scope */ + scope_log = flb_calloc(1, sizeof(Opentelemetry__Proto__Logs__V1__ScopeLogs)); + if (!scope_log) { + flb_errno(); + ret = FLB_RETRY; + break; + } + opentelemetry__proto__logs__v1__scope_logs__init(scope_log); + + scope_log->scope = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__InstrumentationScope)); + if (!scope_log->scope) { + flb_errno(); + flb_free(scope_log); + ret = FLB_RETRY; + break; + } + opentelemetry__proto__common__v1__instrumentation_scope__init(scope_log->scope); + scope_id = tmp_scope_id; + + log_records = flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *)); + if (!log_records) { + flb_errno(); + return -2; + } + + scope_log->log_records = log_records; + scope_logs[resource_log->n_scope_logs] = scope_log; + resource_log->n_scope_logs++; + + + /* group body: $scope['name'] */ + set_scope_name(ctx->ra_scope_name, event.body, scope_log->scope); + + /* group body: $scope['version'] */ + set_scope_version(ctx->ra_scope_version, event.body, scope_log->scope); + + /* group body: $scope['attributes'] */ + set_scope_attributes(ctx->ra_scope_attr, event.body, scope_log->scope); + } + + ret = FLB_OK; + + if (record_type == FLB_LOG_EVENT_GROUP_START) { + continue; + } + } + else if (record_type == FLB_LOG_EVENT_GROUP_END) { + /* do nothing */ + ret = FLB_OK; + resource_id = -1; + scope_id = -1; + continue; + } + + /* if we have a real OTLP context package using log_records */ + if (resource_id >= 0 && scope_id >= 0) { + + } + else { + /* + * standalone packaging: the record is not part of an original OTLP structure, so there is no group + * information. We create a temporary resource for the incoming records unless a group is defined. + */ + tmp_resource_id = 0; + tmp_scope_id = 0; + goto start_resource; + } + + ra_match = NULL; + log_record = flb_calloc(1, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord)); + if (!log_record) { + flb_errno(); + ret = FLB_RETRY; + break; + } + log_records[log_record_count] = log_record; + opentelemetry__proto__logs__v1__log_record__init(log_record); + + /* + * Set the record body by using the logic defined in the configuration by + * the 'logs_body_key' properties. + * + * Note that the reference set in `out_body_parent_key` is the parent/root key that holds the content + * that was discovered. We get that reference so we can easily filter it out when composing + * the final list of attributes. + */ + ret = log_record_set_body(ctx, log_record, &event, &ra_match); + if (ret == -1) { + /* the only possible fail path is a problem with a memory allocation, let's suggest a FLB_RETRY */ + ret = FLB_RETRY; + break; + } + + /* set attributes from metadata and remaining fields from the main record */ + ret = log_record_set_attributes(ctx, log_record, &event, ra_match); + if (ret == -1) { + /* as before, it can only fail on a memory allocation */ + ret = FLB_RETRY; + break; + } + + append_v1_logs_metadata(ctx, &event, log_record); + + ret = FLB_OK; + log_record_count++; + scope_log->n_log_records = log_record_count; + + if (log_record_count >= ctx->batch_size) { + ret = logs_flush_to_otel(ctx, event_chunk, &export_logs); + free_log_records(log_records, log_record_count); + log_record_count = 0; + scope_log->n_log_records = 0; + } + } + + flb_log_event_decoder_destroy(decoder); + + if (log_record_count > 0 && ret == FLB_OK) { + ret = logs_flush_to_otel(ctx, event_chunk, &export_logs); + } + + /* release all protobuf resources */ + free_resource_logs(export_logs.resource_logs, export_logs.n_resource_logs); + return ret; +} diff --git a/plugins/out_opentelemetry/opentelemetry_utils.c b/plugins/out_opentelemetry/opentelemetry_utils.c new file mode 100644 index 00000000000..8c09cfcb4f8 --- /dev/null +++ b/plugins/out_opentelemetry/opentelemetry_utils.c @@ -0,0 +1,507 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "opentelemetry_utils.h" + +Opentelemetry__Proto__Common__V1__ArrayValue *otlp_array_value_initialize(size_t entry_count) +{ + Opentelemetry__Proto__Common__V1__ArrayValue *value; + + value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__ArrayValue)); + + if (value != NULL) { + opentelemetry__proto__common__v1__array_value__init(value); + + if (entry_count > 0) { + value->values = \ + flb_calloc(entry_count, + sizeof(Opentelemetry__Proto__Common__V1__AnyValue *)); + + if (value->values == NULL) { + flb_free(value); + + value = NULL; + } + else { + value->n_values = entry_count; + } + } + } + + return value; +} + +Opentelemetry__Proto__Common__V1__KeyValue *otlp_kvpair_value_initialize() +{ + Opentelemetry__Proto__Common__V1__KeyValue *value; + + value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__KeyValue)); + + if (value != NULL) { + opentelemetry__proto__common__v1__key_value__init(value); + } + + return value; +} + +Opentelemetry__Proto__Common__V1__KeyValueList *otlp_kvlist_value_initialize(size_t entry_count) +{ + Opentelemetry__Proto__Common__V1__KeyValueList *value; + + value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__KeyValueList)); + + if (value != NULL) { + opentelemetry__proto__common__v1__key_value_list__init(value); + + if (entry_count > 0) { + value->values = \ + flb_calloc(entry_count, + sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); + + if (value->values == NULL) { + flb_free(value); + + value = NULL; + } + else { + value->n_values = entry_count; + } + } + } + + return value; +} + +Opentelemetry__Proto__Common__V1__AnyValue *otlp_any_value_initialize(int data_type, size_t entry_count) +{ + Opentelemetry__Proto__Common__V1__AnyValue *value; + + value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__AnyValue)); + + if (value == NULL) { + return NULL; + } + + opentelemetry__proto__common__v1__any_value__init(value); + + if (data_type == MSGPACK_OBJECT_STR) { + value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE; + } + else if (data_type == MSGPACK_OBJECT_NIL) { + value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE__NOT_SET; + } + else if (data_type == MSGPACK_OBJECT_BOOLEAN) { + value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE; + } + else if (data_type == MSGPACK_OBJECT_POSITIVE_INTEGER || data_type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { + value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE; + } + else if (data_type == MSGPACK_OBJECT_FLOAT32 || data_type == MSGPACK_OBJECT_FLOAT64) { + value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE; + } + else if (data_type == MSGPACK_OBJECT_ARRAY) { + value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE; + value->array_value = otlp_array_value_initialize(entry_count); + + if (value->array_value == NULL) { + flb_free(value); + + value = NULL; + } + } + else if (data_type == MSGPACK_OBJECT_MAP) { + value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE; + + value->kvlist_value = otlp_kvlist_value_initialize(entry_count); + + if (value->kvlist_value == NULL) { + flb_free(value); + + value = NULL; + } + } + else if (data_type == MSGPACK_OBJECT_BIN) { + value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE; + } + else { + flb_free(value); + + value = NULL; + } + + return value; +} + +void otlp_kvarray_destroy(Opentelemetry__Proto__Common__V1__KeyValue **kvarray, size_t entry_count) +{ + size_t index; + + if (kvarray != NULL) { + for (index = 0 ; index < entry_count ; index++) { + if (kvarray[index] != NULL) { + otlp_kvpair_destroy(kvarray[index]); + kvarray[index] = NULL; + } + } + + flb_free(kvarray); + } +} + +void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValue *kvpair) +{ + if (kvpair == NULL) { + return; + } + + if (kvpair->key != NULL) { + flb_free(kvpair->key); + } + + if (kvpair->value != NULL) { + otlp_any_value_destroy(kvpair->value); + } + + flb_free(kvpair); +} + +void otlp_kvlist_destroy(Opentelemetry__Proto__Common__V1__KeyValueList *kvlist) +{ + size_t index; + + if (kvlist != NULL) { + if (kvlist->values != NULL) { + for (index = 0 ; index < kvlist->n_values ; index++) { + otlp_kvpair_destroy(kvlist->values[index]); + } + + flb_free(kvlist->values); + } + + flb_free(kvlist); + } +} + +void otlp_array_destroy(Opentelemetry__Proto__Common__V1__ArrayValue *array) +{ + size_t index; + + if (array != NULL) { + if (array->values != NULL) { + for (index = 0 ; index < array->n_values ; index++) { + otlp_any_value_destroy(array->values[index]); + } + + flb_free(array->values); + } + + flb_free(array); + } +} + +void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyValue *value) +{ + if (value != NULL) { + if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) { + if (value->string_value != NULL) { + flb_free(value->string_value); + } + } + else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE) { + if (value->array_value != NULL) { + otlp_array_destroy(value->array_value); + } + } + else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) { + if (value->kvlist_value != NULL) { + otlp_kvlist_destroy(value->kvlist_value); + } + } + else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE) { + if (value->bytes_value.data != NULL) { + flb_free(value->bytes_value.data); + } + } + + value->string_value = NULL; + + flb_free(value); + } +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_boolean_to_otlp_any_value(struct msgpack_object *o) +{ + Opentelemetry__Proto__Common__V1__AnyValue *result; + + result = otlp_any_value_initialize(MSGPACK_OBJECT_BOOLEAN, 0); + + if (result != NULL) { + result->bool_value = o->via.boolean; + } + + return result; +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_integer_to_otlp_any_value(struct msgpack_object *o) +{ + Opentelemetry__Proto__Common__V1__AnyValue *result; + + result = otlp_any_value_initialize(o->type, 0); + + if (result != NULL) { + if (o->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + result->int_value = (int64_t) o->via.u64; + } + else { + result->int_value = o->via.i64; + } + } + + return result; +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_float_to_otlp_any_value(struct msgpack_object *o) +{ + Opentelemetry__Proto__Common__V1__AnyValue *result; + + result = otlp_any_value_initialize(o->type, 0); + + if (result != NULL) { + result->double_value = o->via.f64; + } + + return result; +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_string_to_otlp_any_value(struct msgpack_object *o) +{ + Opentelemetry__Proto__Common__V1__AnyValue *result; + + result = otlp_any_value_initialize(MSGPACK_OBJECT_STR, 0); + + if (result != NULL) { + result->string_value = flb_strndup(o->via.str.ptr, o->via.str.size); + + if (result->string_value == NULL) { + otlp_any_value_destroy(result); + + result = NULL; + } + } + + return result; +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_nil_to_otlp_any_value(struct msgpack_object *o) +{ + Opentelemetry__Proto__Common__V1__AnyValue *result; + + result = otlp_any_value_initialize(MSGPACK_OBJECT_NIL, 0); + + if (result != NULL) { + result->string_value = NULL; + } + + return result; +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_bin_to_otlp_any_value(struct msgpack_object *o) +{ + Opentelemetry__Proto__Common__V1__AnyValue *result; + + result = otlp_any_value_initialize(MSGPACK_OBJECT_BIN, 0); + + if (result != NULL) { + result->bytes_value.len = o->via.bin.size; + result->bytes_value.data = flb_malloc(o->via.bin.size); + + if (result->bytes_value.data == NULL) { + otlp_any_value_destroy(result); + + result = NULL; + } + + memcpy(result->bytes_value.data, o->via.bin.ptr, o->via.bin.size); + } + + return result; +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_array_to_otlp_any_value(struct msgpack_object *o) +{ + size_t entry_count; + Opentelemetry__Proto__Common__V1__AnyValue *entry_value; + Opentelemetry__Proto__Common__V1__AnyValue *result; + size_t index; + msgpack_object *p; + + entry_count = o->via.array.size; + result = otlp_any_value_initialize(MSGPACK_OBJECT_ARRAY, entry_count); + + p = o->via.array.ptr; + + if (result != NULL) { + index = 0; + + for (index = 0 ; index < entry_count ; index++) { + entry_value = msgpack_object_to_otlp_any_value(&p[index]); + + if (entry_value == NULL) { + otlp_any_value_destroy(result); + + result = NULL; + + break; + } + + result->array_value->values[index] = entry_value; + } + } + + return result; +} + +Opentelemetry__Proto__Common__V1__KeyValue *msgpack_kv_to_otlp_any_value(struct msgpack_object_kv *input_pair) +{ + Opentelemetry__Proto__Common__V1__KeyValue *kv; + + kv = otlp_kvpair_value_initialize(); + if (kv == NULL) { + flb_errno(); + + return NULL; + } + + kv->key = flb_strndup(input_pair->key.via.str.ptr, input_pair->key.via.str.size); + if (kv->key == NULL) { + flb_errno(); + flb_free(kv); + + return NULL; + } + + kv->value = msgpack_object_to_otlp_any_value(&input_pair->val); + if (kv->value == NULL) { + flb_free(kv->key); + flb_free(kv); + + return NULL; + } + + return kv; +} + +Opentelemetry__Proto__Common__V1__KeyValue **msgpack_map_to_otlp_kvarray(struct msgpack_object *o, size_t *entry_count) +{ + Opentelemetry__Proto__Common__V1__KeyValue **result; + size_t index; + msgpack_object_kv *kv; + + *entry_count = o->via.map.size; + result = flb_calloc(*entry_count, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); + if (result != NULL) { + for (index = 0; index < *entry_count; index++) { + kv = &o->via.map.ptr[index]; + result[index] = msgpack_kv_to_otlp_any_value(kv); + } + } + else { + *entry_count = 0; + } + + return result; +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_map_to_otlp_any_value(struct msgpack_object *o) +{ + size_t entry_count; + Opentelemetry__Proto__Common__V1__AnyValue *result; + Opentelemetry__Proto__Common__V1__KeyValue *keyvalue; + size_t index; + msgpack_object_kv *kv; + + entry_count = o->via.map.size; + result = otlp_any_value_initialize(MSGPACK_OBJECT_MAP, entry_count); + + if (result != NULL) { + + for (index = 0; index < entry_count; index++) { + kv = &o->via.map.ptr[index]; + keyvalue = msgpack_kv_to_otlp_any_value(kv); + result->kvlist_value->values[index] = keyvalue; + } + } + + return result; +} + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp_any_value(struct msgpack_object *o) +{ + Opentelemetry__Proto__Common__V1__AnyValue *result; + + result = NULL; + + switch (o->type) { + case MSGPACK_OBJECT_NIL: + result = msgpack_nil_to_otlp_any_value(o); + break; + + case MSGPACK_OBJECT_BOOLEAN: + result = msgpack_boolean_to_otlp_any_value(o); + break; + + case MSGPACK_OBJECT_POSITIVE_INTEGER: + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + result = msgpack_integer_to_otlp_any_value(o); + break; + + case MSGPACK_OBJECT_FLOAT32: + case MSGPACK_OBJECT_FLOAT64: + result = msgpack_float_to_otlp_any_value(o); + break; + + case MSGPACK_OBJECT_STR: + result = msgpack_string_to_otlp_any_value(o); + break; + + case MSGPACK_OBJECT_MAP: + result = msgpack_map_to_otlp_any_value(o); + break; + + case MSGPACK_OBJECT_BIN: + result = msgpack_bin_to_otlp_any_value(o); + break; + + case MSGPACK_OBJECT_ARRAY: + result = msgpack_array_to_otlp_any_value(o); + break; + + default: + break; + } + + /* This function will fail if it receives an object with + * type MSGPACK_OBJECT_EXT + */ + + return result; +} diff --git a/plugins/out_opentelemetry/opentelemetry_utils.h b/plugins/out_opentelemetry/opentelemetry_utils.h new file mode 100644 index 00000000000..93f24ddb6dd --- /dev/null +++ b/plugins/out_opentelemetry/opentelemetry_utils.h @@ -0,0 +1,46 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +Opentelemetry__Proto__Common__V1__ArrayValue *otlp_array_value_initialize(size_t entry_count); +Opentelemetry__Proto__Common__V1__KeyValue *otlp_kvpair_value_initialize(); +Opentelemetry__Proto__Common__V1__KeyValueList *otlp_kvlist_value_initialize(size_t entry_count); +Opentelemetry__Proto__Common__V1__AnyValue *otlp_any_value_initialize(int data_type, size_t entry_count); + +void otlp_kvarray_destroy(Opentelemetry__Proto__Common__V1__KeyValue **kvarray, size_t entry_count); +void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValue *kvpair); +void otlp_kvlist_destroy(Opentelemetry__Proto__Common__V1__KeyValueList *kvlist); +void otlp_array_destroy(Opentelemetry__Proto__Common__V1__ArrayValue *array); +void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyValue *value); + +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_boolean_to_otlp_any_value(struct msgpack_object *o); +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_integer_to_otlp_any_value(struct msgpack_object *o); +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_float_to_otlp_any_value(struct msgpack_object *o); +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_string_to_otlp_any_value(struct msgpack_object *o); +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_nil_to_otlp_any_value(struct msgpack_object *o); +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_bin_to_otlp_any_value(struct msgpack_object *o); +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_array_to_otlp_any_value(struct msgpack_object *o); + +Opentelemetry__Proto__Common__V1__KeyValue *msgpack_kv_to_otlp_any_value(struct msgpack_object_kv *input_pair); +Opentelemetry__Proto__Common__V1__KeyValue **msgpack_map_to_otlp_kvarray(struct msgpack_object *o, size_t *entry_count); +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_map_to_otlp_any_value(struct msgpack_object *o); +Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp_any_value(struct msgpack_object *o); +