Skip to content

Commit

Permalink
Enable OTLP Trace flushing to SLS (#1906)
Browse files Browse the repository at this point in the history
* enable otlp trace to SlsPb

* fix UT
  • Loading branch information
yyuuttaaoo committed Nov 25, 2024
1 parent ffbc94c commit 48efbbb
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/cn/plugins/input/service-otlp.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## 简介

`service_otlp` `input`插件实现了`ServiceInputV1``ServiceInputV2`接口,可以接受`Opentelemetry log/metric/trace protocol`的http/gRPC请求,并且转换输出SLSProto或PipelineGroupEvents。目前尚不支持otlp trace转换到SLSProto。
`service_otlp` `input`插件实现了`ServiceInputV1``ServiceInputV2`接口,可以接受`Opentelemetry log/metric/trace protocol`的http/gRPC请求,并且转换输出SLSProto或PipelineGroupEvents。

## 版本

Expand Down
3 changes: 2 additions & 1 deletion pkg/protocol/decoder/opentelemetry/otlpDataToSLSProto.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,5 +427,6 @@ func ConvertOtlpTraceRequestV1(otlpTraceReq ptraceotlp.ExportRequest) (logs []*p
}

func ConvertOtlpTraceV1(otlpTrace ptrace.Traces) (logs []*protocol.Log, err error) {
return logs, fmt.Errorf("does_not_support_otlptraces")
log, _ := ConvertTrace(otlpTrace)
return log, nil
}
102 changes: 100 additions & 2 deletions plugins/input/opentelemetry/service_otlp_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package opentelemetry
import (
"fmt"
"net/http"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -203,7 +205,55 @@ func TestOtlpGRPC_Trace_V1(t *testing.T) {

for i := 0; i < queueSize; i++ {
err = exportTraces(cc, GenerateTraces(i+1))
assert.Error(t, err, "does_not_support_otlptraces")
assert.NoError(t, err)
}

starttm := time.Date(2020, 2, 11, 20, 26, 12, 321, time.UTC)
endtm := time.Date(2020, 2, 11, 20, 26, 13, 789, time.UTC)
traceID := "0102030405060708090a0b0c0d0e0f10"
spanID := "1112131415161718"
count := 0
pos := 0
for count < queueSize {
count++
for i := pos; i < pos+count; i++ {
log := collector.Logs[i]
assert.Equal(t, int64(log.Time), endtm.Unix())
assert.Equal(t, "host", log.Contents[0].Key)
assert.Equal(t, "service", log.Contents[1].Key)
assert.Equal(t, "resource", log.Contents[2].Key)
assert.Equal(t, "{\"resource-attr\":\"resource-attr-val-1\"}", log.Contents[2].Value)
assert.Equal(t, "otlp.name", log.Contents[3].Key)
assert.Equal(t, "otlp.version", log.Contents[4].Key)
assert.Equal(t, "traceID", log.Contents[5].Key)
assert.Equal(t, "spanID", log.Contents[6].Key)
assert.Equal(t, "parentSpanID", log.Contents[7].Key)
assert.Equal(t, "kind", log.Contents[8].Key)
assert.Equal(t, "name", log.Contents[9].Key)
assert.Equal(t, "links", log.Contents[10].Key)
assert.Equal(t, "logs", log.Contents[11].Key)
if (i-pos)%2 == 0 {
assert.Equal(t, traceID, log.Contents[5].Value)
assert.Equal(t, spanID, log.Contents[6].Value)
assert.Equal(t, "operationA", log.Contents[9].Value)
assert.Equal(t, "[{\"attribute\":{\"span-event-attr\":\"span-event-attr-val\"},\"name\":\"event-with-attr\",\"time\":1581452773000000123},{\"attribute\":{},\"name\":\"event\",\"time\":1581452773000000123}]", log.Contents[11].Value)
} else {
assert.Equal(t, []byte{}, []byte(log.Contents[5].Value))
assert.Equal(t, []byte{}, []byte(log.Contents[6].Value))
assert.Equal(t, "operationB", log.Contents[9].Value)
assert.Equal(t, "[{\"attribute\":{\"span-link-attr\":\"span-link-attr-val\"},\"spanID\":\"\",\"traceID\":\"\"},{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"\"}]", log.Contents[10].Value)
}
assert.Equal(t, "traceState", log.Contents[12].Key)
assert.Equal(t, "start", log.Contents[13].Key)
assert.Equal(t, strconv.FormatInt(starttm.UnixMicro(), 10), log.Contents[13].Value)
assert.Equal(t, "end", log.Contents[14].Key)
assert.Equal(t, strconv.FormatInt(endtm.UnixMicro(), 10), log.Contents[14].Value)
assert.Equal(t, "duration", log.Contents[15].Key)
assert.Equal(t, "attribute", log.Contents[16].Key)
assert.Equal(t, "statusCode", log.Contents[17].Key)
assert.Equal(t, "statusMessage", log.Contents[18].Key)
}
pos += count
}
}

Expand Down Expand Up @@ -312,6 +362,54 @@ func TestOtlpHTTP_Trace_V1(t *testing.T) {
for i := 0; i < queueSize; i++ {
req := ptraceotlp.NewExportRequestFromTraces(GenerateTraces(i + 1))
err = httpExport(client, req, url, i%2 == 0)
assert.Error(t, err, "does_not_support_otlptraces")
assert.NoError(t, err)
}

starttm := time.Date(2020, 2, 11, 20, 26, 12, 321, time.UTC)
endtm := time.Date(2020, 2, 11, 20, 26, 13, 789, time.UTC)
traceID := "0102030405060708090a0b0c0d0e0f10"
spanID := "1112131415161718"
count := 0
pos := 0
for count < queueSize {
count++
for i := pos; i < pos+count; i++ {
log := collector.Logs[i]
assert.Equal(t, int64(log.Time), endtm.Unix())
assert.Equal(t, "host", log.Contents[0].Key)
assert.Equal(t, "service", log.Contents[1].Key)
assert.Equal(t, "resource", log.Contents[2].Key)
assert.Equal(t, "{\"resource-attr\":\"resource-attr-val-1\"}", log.Contents[2].Value)
assert.Equal(t, "otlp.name", log.Contents[3].Key)
assert.Equal(t, "otlp.version", log.Contents[4].Key)
assert.Equal(t, "traceID", log.Contents[5].Key)
assert.Equal(t, "spanID", log.Contents[6].Key)
assert.Equal(t, "parentSpanID", log.Contents[7].Key)
assert.Equal(t, "kind", log.Contents[8].Key)
assert.Equal(t, "name", log.Contents[9].Key)
assert.Equal(t, "links", log.Contents[10].Key)
assert.Equal(t, "logs", log.Contents[11].Key)
if (i-pos)%2 == 0 {
assert.Equal(t, traceID, log.Contents[5].Value)
assert.Equal(t, spanID, log.Contents[6].Value)
assert.Equal(t, "operationA", log.Contents[9].Value)
assert.Equal(t, "[{\"attribute\":{\"span-event-attr\":\"span-event-attr-val\"},\"name\":\"event-with-attr\",\"time\":1581452773000000123},{\"attribute\":{},\"name\":\"event\",\"time\":1581452773000000123}]", log.Contents[11].Value)
} else {
assert.Equal(t, []byte{}, []byte(log.Contents[5].Value))
assert.Equal(t, []byte{}, []byte(log.Contents[6].Value))
assert.Equal(t, "operationB", log.Contents[9].Value)
assert.Equal(t, "[{\"attribute\":{\"span-link-attr\":\"span-link-attr-val\"},\"spanID\":\"\",\"traceID\":\"\"},{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"\"}]", log.Contents[10].Value)
}
assert.Equal(t, "traceState", log.Contents[12].Key)
assert.Equal(t, "start", log.Contents[13].Key)
assert.Equal(t, strconv.FormatInt(starttm.UnixMicro(), 10), log.Contents[13].Value)
assert.Equal(t, "end", log.Contents[14].Key)
assert.Equal(t, strconv.FormatInt(endtm.UnixMicro(), 10), log.Contents[14].Value)
assert.Equal(t, "duration", log.Contents[15].Key)
assert.Equal(t, "attribute", log.Contents[16].Key)
assert.Equal(t, "statusCode", log.Contents[17].Key)
assert.Equal(t, "statusMessage", log.Contents[18].Key)
}
pos += count
}
}

0 comments on commit 48efbbb

Please sign in to comment.