From 48efbbbb21055b883c72729ecac5d2aa73dd92fa Mon Sep 17 00:00:00 2001 From: Tom Yu Date: Mon, 25 Nov 2024 11:58:37 +0800 Subject: [PATCH] Enable OTLP Trace flushing to SLS (#1906) * enable otlp trace to SlsPb * fix UT --- docs/cn/plugins/input/service-otlp.md | 2 +- .../opentelemetry/otlpDataToSLSProto.go | 3 +- .../opentelemetry/service_otlp_v1_test.go | 102 +++++++++++++++++- 3 files changed, 103 insertions(+), 4 deletions(-) diff --git a/docs/cn/plugins/input/service-otlp.md b/docs/cn/plugins/input/service-otlp.md index a47b7b6aad..8ff61e6d85 100644 --- a/docs/cn/plugins/input/service-otlp.md +++ b/docs/cn/plugins/input/service-otlp.md @@ -2,7 +2,7 @@ ## 简介 -`service_otlp` `input`插件实现了`ServiceInputV1`和`ServiceInputV2`接口,可以接受`Opentelemetry log/metric/trace protocol`的http/gRPC请求,并且转换输出SLSProto或PipelineGroupEvents。目前尚不支持otlp trace转换到SLSProto。 +`service_otlp` `input`插件实现了`ServiceInputV1`和`ServiceInputV2`接口,可以接受`Opentelemetry log/metric/trace protocol`的http/gRPC请求,并且转换输出SLSProto或PipelineGroupEvents。 ## 版本 diff --git a/pkg/protocol/decoder/opentelemetry/otlpDataToSLSProto.go b/pkg/protocol/decoder/opentelemetry/otlpDataToSLSProto.go index cde29c88ad..a5adc16357 100644 --- a/pkg/protocol/decoder/opentelemetry/otlpDataToSLSProto.go +++ b/pkg/protocol/decoder/opentelemetry/otlpDataToSLSProto.go @@ -427,5 +427,6 @@ func ConvertOtlpTraceRequestV1(otlpTraceReq ptraceotlp.ExportRequest) (logs []*p } func ConvertOtlpTraceV1(otlpTrace ptrace.Traces) (logs []*protocol.Log, err error) { - return logs, fmt.Errorf("does_not_support_otlptraces") + log, _ := ConvertTrace(otlpTrace) + return log, nil } diff --git a/plugins/input/opentelemetry/service_otlp_v1_test.go b/plugins/input/opentelemetry/service_otlp_v1_test.go index c531a8ce35..ac48671ff3 100644 --- a/plugins/input/opentelemetry/service_otlp_v1_test.go +++ b/plugins/input/opentelemetry/service_otlp_v1_test.go @@ -17,7 +17,9 @@ package opentelemetry import ( "fmt" "net/http" + "strconv" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -203,7 +205,55 @@ func TestOtlpGRPC_Trace_V1(t *testing.T) { for i := 0; i < queueSize; i++ { err = exportTraces(cc, GenerateTraces(i+1)) - assert.Error(t, err, "does_not_support_otlptraces") + assert.NoError(t, err) + } + + starttm := time.Date(2020, 2, 11, 20, 26, 12, 321, time.UTC) + endtm := time.Date(2020, 2, 11, 20, 26, 13, 789, time.UTC) + traceID := "0102030405060708090a0b0c0d0e0f10" + spanID := "1112131415161718" + count := 0 + pos := 0 + for count < queueSize { + count++ + for i := pos; i < pos+count; i++ { + log := collector.Logs[i] + assert.Equal(t, int64(log.Time), endtm.Unix()) + assert.Equal(t, "host", log.Contents[0].Key) + assert.Equal(t, "service", log.Contents[1].Key) + assert.Equal(t, "resource", log.Contents[2].Key) + assert.Equal(t, "{\"resource-attr\":\"resource-attr-val-1\"}", log.Contents[2].Value) + assert.Equal(t, "otlp.name", log.Contents[3].Key) + assert.Equal(t, "otlp.version", log.Contents[4].Key) + assert.Equal(t, "traceID", log.Contents[5].Key) + assert.Equal(t, "spanID", log.Contents[6].Key) + assert.Equal(t, "parentSpanID", log.Contents[7].Key) + assert.Equal(t, "kind", log.Contents[8].Key) + assert.Equal(t, "name", log.Contents[9].Key) + assert.Equal(t, "links", log.Contents[10].Key) + assert.Equal(t, "logs", log.Contents[11].Key) + if (i-pos)%2 == 0 { + assert.Equal(t, traceID, log.Contents[5].Value) + assert.Equal(t, spanID, log.Contents[6].Value) + assert.Equal(t, "operationA", log.Contents[9].Value) + assert.Equal(t, "[{\"attribute\":{\"span-event-attr\":\"span-event-attr-val\"},\"name\":\"event-with-attr\",\"time\":1581452773000000123},{\"attribute\":{},\"name\":\"event\",\"time\":1581452773000000123}]", log.Contents[11].Value) + } else { + assert.Equal(t, []byte{}, []byte(log.Contents[5].Value)) + assert.Equal(t, []byte{}, []byte(log.Contents[6].Value)) + assert.Equal(t, "operationB", log.Contents[9].Value) + assert.Equal(t, "[{\"attribute\":{\"span-link-attr\":\"span-link-attr-val\"},\"spanID\":\"\",\"traceID\":\"\"},{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"\"}]", log.Contents[10].Value) + } + assert.Equal(t, "traceState", log.Contents[12].Key) + assert.Equal(t, "start", log.Contents[13].Key) + assert.Equal(t, strconv.FormatInt(starttm.UnixMicro(), 10), log.Contents[13].Value) + assert.Equal(t, "end", log.Contents[14].Key) + assert.Equal(t, strconv.FormatInt(endtm.UnixMicro(), 10), log.Contents[14].Value) + assert.Equal(t, "duration", log.Contents[15].Key) + assert.Equal(t, "attribute", log.Contents[16].Key) + assert.Equal(t, "statusCode", log.Contents[17].Key) + assert.Equal(t, "statusMessage", log.Contents[18].Key) + } + pos += count } } @@ -312,6 +362,54 @@ func TestOtlpHTTP_Trace_V1(t *testing.T) { for i := 0; i < queueSize; i++ { req := ptraceotlp.NewExportRequestFromTraces(GenerateTraces(i + 1)) err = httpExport(client, req, url, i%2 == 0) - assert.Error(t, err, "does_not_support_otlptraces") + assert.NoError(t, err) + } + + starttm := time.Date(2020, 2, 11, 20, 26, 12, 321, time.UTC) + endtm := time.Date(2020, 2, 11, 20, 26, 13, 789, time.UTC) + traceID := "0102030405060708090a0b0c0d0e0f10" + spanID := "1112131415161718" + count := 0 + pos := 0 + for count < queueSize { + count++ + for i := pos; i < pos+count; i++ { + log := collector.Logs[i] + assert.Equal(t, int64(log.Time), endtm.Unix()) + assert.Equal(t, "host", log.Contents[0].Key) + assert.Equal(t, "service", log.Contents[1].Key) + assert.Equal(t, "resource", log.Contents[2].Key) + assert.Equal(t, "{\"resource-attr\":\"resource-attr-val-1\"}", log.Contents[2].Value) + assert.Equal(t, "otlp.name", log.Contents[3].Key) + assert.Equal(t, "otlp.version", log.Contents[4].Key) + assert.Equal(t, "traceID", log.Contents[5].Key) + assert.Equal(t, "spanID", log.Contents[6].Key) + assert.Equal(t, "parentSpanID", log.Contents[7].Key) + assert.Equal(t, "kind", log.Contents[8].Key) + assert.Equal(t, "name", log.Contents[9].Key) + assert.Equal(t, "links", log.Contents[10].Key) + assert.Equal(t, "logs", log.Contents[11].Key) + if (i-pos)%2 == 0 { + assert.Equal(t, traceID, log.Contents[5].Value) + assert.Equal(t, spanID, log.Contents[6].Value) + assert.Equal(t, "operationA", log.Contents[9].Value) + assert.Equal(t, "[{\"attribute\":{\"span-event-attr\":\"span-event-attr-val\"},\"name\":\"event-with-attr\",\"time\":1581452773000000123},{\"attribute\":{},\"name\":\"event\",\"time\":1581452773000000123}]", log.Contents[11].Value) + } else { + assert.Equal(t, []byte{}, []byte(log.Contents[5].Value)) + assert.Equal(t, []byte{}, []byte(log.Contents[6].Value)) + assert.Equal(t, "operationB", log.Contents[9].Value) + assert.Equal(t, "[{\"attribute\":{\"span-link-attr\":\"span-link-attr-val\"},\"spanID\":\"\",\"traceID\":\"\"},{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"\"}]", log.Contents[10].Value) + } + assert.Equal(t, "traceState", log.Contents[12].Key) + assert.Equal(t, "start", log.Contents[13].Key) + assert.Equal(t, strconv.FormatInt(starttm.UnixMicro(), 10), log.Contents[13].Value) + assert.Equal(t, "end", log.Contents[14].Key) + assert.Equal(t, strconv.FormatInt(endtm.UnixMicro(), 10), log.Contents[14].Value) + assert.Equal(t, "duration", log.Contents[15].Key) + assert.Equal(t, "attribute", log.Contents[16].Key) + assert.Equal(t, "statusCode", log.Contents[17].Key) + assert.Equal(t, "statusMessage", log.Contents[18].Key) + } + pos += count } }