Skip to content

Commit

Permalink
in_opentelemetry: add a function to store metadata
Browse files Browse the repository at this point in the history
This function is to store fields other than body as metadata.

The fields are defined at
https://opentelemetry.io/docs/specs/otel/logs/data-model/#definitions-used-in-this-document

Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 authored and edsiper committed Apr 30, 2024
1 parent 4effae3 commit 10568b6
Showing 1 changed file with 140 additions and 43 deletions.
183 changes: 140 additions & 43 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,148 @@ static int otlp_pack_any_value(msgpack_packer *mp_pck,
return result;
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition */
static int otel_pack_v1_metadata(msgpack_packer *mp_pck,
struct Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
Opentelemetry__Proto__Resource__V1__Resource *resource,
Opentelemetry__Proto__Common__V1__InstrumentationScope *scope)
{
struct flb_mp_map_header mh;
struct flb_mp_map_header scope_mh;
int ret;
flb_mp_map_header_init(&mh, mp_pck);

flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 17);
msgpack_pack_str_body(mp_pck, "ObservedTimestamp", 17);
msgpack_pack_uint64(mp_pck, log_record->observed_time_unix_nano);

/* Value of 0 indicates unknown or missing timestamp. */
if (log_record->time_unix_nano != 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 9);
msgpack_pack_str_body(mp_pck, "Timestamp", 9);
msgpack_pack_uint64(mp_pck, log_record->time_unix_nano);
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
if (log_record->severity_number >= 1 && log_record->severity_number <= 24) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 14);
msgpack_pack_str_body(mp_pck, "SeverityNumber", 14);
msgpack_pack_uint64(mp_pck, log_record->severity_number);
}

if (log_record->severity_text != NULL && strlen(log_record->severity_text) > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 12);
msgpack_pack_str_body(mp_pck, "SeverityText", 12);
msgpack_pack_str(mp_pck, strlen(log_record->severity_text));
msgpack_pack_str_body(mp_pck, log_record->severity_text, strlen(log_record->severity_text));
}

if (log_record->n_attributes > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "Attributes", 10);
ret = otel_pack_kvarray(mp_pck,
log_record->attributes,
log_record->n_attributes);
if (ret != 0) {
return ret;
}
}

if (log_record->trace_id.len > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 7);
msgpack_pack_str_body(mp_pck, "TraceId", 7);
ret = otel_pack_bytes(mp_pck, log_record->trace_id);
if (ret != 0) {
return ret;
}
}

if (log_record->span_id.len > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 6);
msgpack_pack_str_body(mp_pck, "SpanId", 6);
ret = otel_pack_bytes(mp_pck, log_record->span_id);
if (ret != 0) {
return ret;
}
}

flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "TraceFlags", 10);
msgpack_pack_uint8(mp_pck, (uint8_t)log_record->flags & 0xff);



if (resource != NULL && resource->n_attributes > 0 && resource->attributes) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 8);
msgpack_pack_str_body(mp_pck, "Resource", 8);

ret = otel_pack_kvarray(mp_pck,
resource->attributes,
resource->n_attributes);
if (ret != 0) {
return ret;
}
}

if (scope != NULL && (scope->name || scope->version || scope->n_attributes > 0)) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 20);
msgpack_pack_str_body(mp_pck, "InstrumentationScope", 20);

flb_mp_map_header_init(&scope_mh, mp_pck);
if (scope->name != NULL && strlen(scope->name) > 0) {
flb_mp_map_header_append(&scope_mh);
msgpack_pack_str(mp_pck, 4);
msgpack_pack_str_body(mp_pck, "Name", 4);
msgpack_pack_str(mp_pck, strlen(scope->name));
msgpack_pack_str_body(mp_pck, scope->name, strlen(scope->name));
}
if (scope->version != NULL && strlen(scope->version) > 0) {
flb_mp_map_header_append(&scope_mh);
msgpack_pack_str(mp_pck, 7);
msgpack_pack_str_body(mp_pck, "Version", 7);
msgpack_pack_str(mp_pck, strlen(scope->version));
msgpack_pack_str_body(mp_pck, scope->version, strlen(scope->version));
}
if (scope->n_attributes > 0 && scope->attributes) {
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "Attributes", 10);
ret = otel_pack_kvarray(mp_pck,
scope->attributes,
scope->n_attributes);
if (ret != 0) {
return ret;
}
}

flb_mp_map_header_end(&scope_mh);
}

flb_mp_map_header_end(&mh);
return 0;
}

static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
uint8_t *in_buf,
size_t in_size)
{
int ret;
msgpack_packer packer;
msgpack_sbuffer buffer;
msgpack_packer meta_packer;
msgpack_sbuffer meta_buffer;
int resource_logs_index;
int scope_log_index;
int log_record_index;
struct flb_mp_map_header mh;

Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs;
Expand All @@ -411,6 +542,8 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,

msgpack_sbuffer_init(&buffer);
msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write);
msgpack_sbuffer_init(&meta_buffer);
msgpack_packer_init(&meta_packer, &meta_buffer, msgpack_sbuffer_write);

input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
if (input_logs == NULL) {
Expand Down Expand Up @@ -455,55 +588,18 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_mp_map_header_init(&mh, &packer);

/* pack resource */
flb_mp_map_header_append(&mh);
msgpack_pack_str(&packer, 8);
msgpack_pack_str_body(&packer, "resource", 8);
if (resource != NULL) {
msgpack_pack_map(&packer, 1);

msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "attributes", 10);
ret = otel_pack_kvarray(
&packer,
resource->attributes,
resource->n_attributes);
}
else {
msgpack_pack_map(&packer, 0);
}

if (ret != 0) {
flb_error("[otel] Failed to convert log resource attributes");
goto binary_payload_to_msgpack_end;
}

/* pack logRecords */
flb_mp_map_header_append(&mh);
msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "logRecords", 10);

msgpack_pack_map(&packer, 1);
msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "attributes", 10);
ret = otel_pack_kvarray(
&packer,
log_records[log_record_index]->attributes,
log_records[log_record_index]->n_attributes);

msgpack_sbuffer_clear(&meta_buffer);
ret = otel_pack_v1_metadata(&meta_packer, log_records[log_record_index], resource, scope_log->scope);
if (ret != 0) {
flb_error("[otel] Failed to convert log record attributes");
flb_error("[otel] Failed to convert log record");

ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
flb_mp_map_header_end(&mh);
ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(
encoder,
buffer.data,
buffer.size);
meta_buffer.data,
meta_buffer.size);
}

msgpack_sbuffer_clear(&buffer);
Expand Down Expand Up @@ -551,6 +647,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,

binary_payload_to_msgpack_end:
msgpack_sbuffer_destroy(&buffer);
msgpack_sbuffer_destroy(&meta_buffer);
if (input_logs) {
opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked(
input_logs, NULL);
Expand Down

0 comments on commit 10568b6

Please sign in to comment.