diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index acd1e8c6949..a99ca12fb89 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -1225,22 +1225,25 @@ static int json_payload_append_converted_value( return result; } -static int process_json_payload_log_records_entry( - struct flb_opentelemetry *ctx, - struct flb_log_event_encoder *encoder, - msgpack_object *log_records_object) +static int process_json_payload_log_records_entry(struct flb_opentelemetry *ctx, + struct flb_log_event_encoder *encoder, + msgpack_object *log_records_object) { - msgpack_object_map *log_records_entry; + int result; + int body_type; char timestamp_str[32]; + msgpack_object_map *log_records_entry; msgpack_object *timestamp_object; uint64_t timestamp_uint64; msgpack_object *metadata_object; msgpack_object *body_object; - int body_type; - struct flb_time timestamp; - int result; + msgpack_object *observed_time_unix_nano = NULL; msgpack_object *severity_number = NULL; msgpack_object *severity_text = NULL; + msgpack_object *trace_id = NULL; + msgpack_object *span_id = NULL; + struct flb_time timestamp; + if (log_records_object->type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected logRecords entry type"); @@ -1300,6 +1303,15 @@ static int process_json_payload_log_records_entry( flb_time_from_uint64(×tamp, timestamp_uint64); } + /* observedTimeUnixNano (yes, we do it again) */ + result = find_map_entry_by_key(log_records_entry, "observedTimeUnixNano", 0, FLB_TRUE); + if (result == -1) { + result = find_map_entry_by_key(log_records_entry, "observed_time_unix_nano", 0, FLB_TRUE); + } + else if (result >= 0) { + observed_time_unix_nano = &log_records_entry->ptr[result].val; + } + /* severityNumber */ result = find_map_entry_by_key(log_records_entry, "severityNumber", 0, FLB_TRUE); if (result == -1) { @@ -1334,6 +1346,24 @@ static int process_json_payload_log_records_entry( metadata_object = &log_records_entry->ptr[result].val; } + /* traceId */ + result = find_map_entry_by_key(log_records_entry, "traceId", 0, FLB_TRUE); + if (result == -1) { + result = find_map_entry_by_key(log_records_entry, "trace_id", 0, FLB_TRUE); + } + if (result >= 0) { + trace_id = &log_records_entry->ptr[result].val; + } + + /* spanId */ + result = find_map_entry_by_key(log_records_entry, "spanId", 0, FLB_TRUE); + if (result == -1) { + result = find_map_entry_by_key(log_records_entry, "span_id", 0, FLB_TRUE); + } + if (result >= 0) { + span_id = &log_records_entry->ptr[result].val; + } + result = find_map_entry_by_key(log_records_entry, "body", 0, FLB_TRUE); if (result == -1) { @@ -1363,6 +1393,27 @@ static int process_json_payload_log_records_entry( flb_log_event_encoder_append_string(encoder, FLB_LOG_EVENT_METADATA, ctx->logs_metadata_key, flb_sds_len(ctx->logs_metadata_key)); flb_log_event_encoder_begin_map(encoder, FLB_LOG_EVENT_METADATA); + if (observed_time_unix_nano != NULL && observed_time_unix_nano->type == MSGPACK_OBJECT_STR) { + memset(timestamp_str, 0, sizeof(timestamp_str)); + + if (timestamp_object->via.str.size < sizeof(timestamp_str)) { + strncpy(timestamp_str, + timestamp_object->via.str.ptr, + timestamp_object->via.str.size); + } + else { + strncpy(timestamp_str, + timestamp_object->via.str.ptr, + sizeof(timestamp_str) - 1); + } + + timestamp_uint64 = strtoul(timestamp_str, NULL, 10); + + flb_log_event_encoder_append_metadata_values(encoder, + FLB_LOG_EVENT_STRING_VALUE("observed_timestamp", 18), + FLB_LOG_EVENT_INT64_VALUE(timestamp_uint64)); + } + if (severity_number != NULL) { flb_log_event_encoder_append_metadata_values(encoder, FLB_LOG_EVENT_STRING_VALUE("severity_number", 15), @@ -1380,6 +1431,18 @@ static int process_json_payload_log_records_entry( result = json_payload_append_converted_kvlist(encoder, FLB_LOG_EVENT_METADATA, metadata_object); } + if (trace_id != NULL && (trace_id->type == MSGPACK_OBJECT_STR || trace_id->type == MSGPACK_OBJECT_BIN)) { + flb_log_event_encoder_append_metadata_values(encoder, + FLB_LOG_EVENT_STRING_VALUE("trace_id", 8), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(trace_id)); + } + + if (span_id != NULL && (span_id->type == MSGPACK_OBJECT_STR || span_id->type == MSGPACK_OBJECT_BIN)) { + flb_log_event_encoder_append_metadata_values(encoder, + FLB_LOG_EVENT_STRING_VALUE("span_id", 7), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(span_id)); + } + flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_METADATA); }