diff --git a/.chloggen/traces-for-libhoneyreceiver.yaml b/.chloggen/traces-for-libhoneyreceiver.yaml new file mode 100644 index 000000000000..c593bb41d5fa --- /dev/null +++ b/.chloggen/traces-for-libhoneyreceiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: libhoneyreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement trace signal for libhoney receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36693] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/receiver/libhoneyreceiver/go.mod b/receiver/libhoneyreceiver/go.mod index e3e9a5634fc2..cf69f53950cb 100644 --- a/receiver/libhoneyreceiver/go.mod +++ b/receiver/libhoneyreceiver/go.mod @@ -18,6 +18,7 @@ require ( go.opentelemetry.io/collector/receiver v0.116.0 go.opentelemetry.io/collector/receiver/receivertest v0.116.0 go.opentelemetry.io/collector/semconv v0.116.0 + go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 @@ -68,7 +69,6 @@ require ( go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect - go.opentelemetry.io/otel/trace v1.32.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect diff --git a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go index 5519e304f138..9b851e3018e5 100644 --- a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go +++ b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go @@ -4,15 +4,21 @@ package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" import ( + "crypto/rand" + "encoding/binary" + "encoding/hex" "encoding/json" "errors" "fmt" + "hash/fnv" "slices" + "strings" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" + trc "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" @@ -87,8 +93,29 @@ func (l *LibhoneyEvent) DebugString() string { } // SignalType returns the type of signal this event represents. Only log is implemented for now. -func (l *LibhoneyEvent) SignalType() (string, error) { - return "log", nil +func (l *LibhoneyEvent) SignalType(logger zap.Logger) string { + if sig, ok := l.Data["meta.signal_type"]; ok { + switch sig { + case "trace": + if atype, ok := l.Data["meta.annotation_type"]; ok { + if atype == "span_event" { + return "span_event" + } else if atype == "link" { + return "span_link" + } + logger.Warn("invalid annotation type", zap.String("meta.annotation_type", atype.(string))) + return "span" + } + return "span" + case "log": + return "log" + default: + logger.Warn("invalid meta.signal_type", zap.String("meta.signal_type", sig.(string))) + return "log" + } + } + logger.Warn("missing meta.signal_type and meta.annotation_type") + return "log" } // GetService returns the service name from the event or the dataset name if no service name is found. @@ -126,6 +153,36 @@ func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serv return "libhoney.receiver", errors.New("library name not found") } +func spanIDFrom(s string) trc.SpanID { + hash := fnv.New64a() + hash.Write([]byte(s)) + n := hash.Sum64() + sid := trc.SpanID{} + binary.LittleEndian.PutUint64(sid[:], n) + return sid +} + +func traceIDFrom(s string) trc.TraceID { + hash := fnv.New64a() + hash.Write([]byte(s)) + n1 := hash.Sum64() + hash.Write([]byte(s)) + n2 := hash.Sum64() + tid := trc.TraceID{} + binary.LittleEndian.PutUint64(tid[:], n1) + binary.LittleEndian.PutUint64(tid[8:], n2) + return tid +} + +func generateAnId(length int) []byte { + token := make([]byte, length) + _, err := rand.Read(token) + if err != nil { + return []byte{} + } + return token +} + // SimpleScope is a simple struct to hold the scope data type SimpleScope struct { ServiceName string @@ -198,3 +255,130 @@ func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields * } return nil } + +// GetParentID returns the parent id from the event or an error if it's not found +func (l *LibhoneyEvent) GetParentID(fieldName string) (trc.SpanID, error) { + if pid, ok := l.Data[fieldName]; ok { + pid := strings.ReplaceAll(pid.(string), "-", "") + pidByteArray, err := hex.DecodeString(pid) + if err == nil { + if len(pidByteArray) == 32 { + pidByteArray = pidByteArray[8:24] + } else if len(pidByteArray) >= 16 { + pidByteArray = pidByteArray[0:16] + } + return trc.SpanID(pidByteArray), nil + } + return trc.SpanID{}, errors.New("parent id is not a valid span id") + } + return trc.SpanID{}, errors.New("parent id not found") +} + +// ToPTraceSpan converts a LibhoneyEvent to a Pdata Span +func (l *LibhoneyEvent) ToPTraceSpan(newSpan *ptrace.Span, alreadyUsedFields *[]string, cfg FieldMapConfig, logger zap.Logger) error { + time_ns := l.MsgPackTimestamp.UnixNano() + logger.Debug("processing trace with", zap.Int64("timestamp", time_ns)) + + var parent_id trc.SpanID + if pid, ok := l.Data[cfg.Attributes.ParentID]; ok { + parent_id = spanIDFrom(pid.(string)) + newSpan.SetParentSpanID(pcommon.SpanID(parent_id)) + } + + duration_ms := 0.0 + for _, df := range cfg.Attributes.DurationFields { + if duration, okay := l.Data[df]; okay { + duration_ms = duration.(float64) + break + } + } + end_timestamp := time_ns + (int64(duration_ms) * 1000000) + + if tid, ok := l.Data[cfg.Attributes.TraceID]; ok { + tid := strings.ReplaceAll(tid.(string), "-", "") + tidByteArray, err := hex.DecodeString(tid) + if err == nil { + if len(tidByteArray) >= 32 { + tidByteArray = tidByteArray[0:32] + } + newSpan.SetTraceID(pcommon.TraceID(tidByteArray)) + } else { + newSpan.SetTraceID(pcommon.TraceID(traceIDFrom(tid))) + } + } else { + newSpan.SetTraceID(pcommon.TraceID(generateAnId(32))) + } + + if sid, ok := l.Data[cfg.Attributes.SpanID]; ok { + sid := strings.ReplaceAll(sid.(string), "-", "") + sidByteArray, err := hex.DecodeString(sid) + if err == nil { + if len(sidByteArray) == 32 { + sidByteArray = sidByteArray[8:24] + } else if len(sidByteArray) >= 16 { + sidByteArray = sidByteArray[0:16] + } + newSpan.SetSpanID(pcommon.SpanID(sidByteArray)) + } else { + newSpan.SetSpanID(pcommon.SpanID(spanIDFrom(sid))) + } + } else { + newSpan.SetSpanID(pcommon.SpanID(generateAnId(16))) + } + + newSpan.SetStartTimestamp(pcommon.Timestamp(time_ns)) + newSpan.SetEndTimestamp(pcommon.Timestamp(end_timestamp)) + + if spanName, ok := l.Data[cfg.Attributes.Name]; ok { + newSpan.SetName(spanName.(string)) + } + if spanStatusMessge, ok := l.Data["status_message"]; ok { + newSpan.Status().SetMessage(spanStatusMessge.(string)) + } + newSpan.Status().SetCode(ptrace.StatusCodeUnset) + + if _, ok := l.Data[cfg.Attributes.Error]; ok { + newSpan.Status().SetCode(ptrace.StatusCodeError) + } + + if spanKind, ok := l.Data[cfg.Attributes.SpanKind]; ok { + switch spanKind.(string) { + case "server": + newSpan.SetKind(ptrace.SpanKindServer) + case "client": + newSpan.SetKind(ptrace.SpanKindClient) + case "producer": + newSpan.SetKind(ptrace.SpanKindProducer) + case "consumer": + newSpan.SetKind(ptrace.SpanKindConsumer) + case "internal": + newSpan.SetKind(ptrace.SpanKindInternal) + default: + newSpan.SetKind(ptrace.SpanKindUnspecified) + } + } + + newSpan.Attributes().PutInt("SampleRate", int64(l.Samplerate)) + + for k, v := range l.Data { + if slices.Contains(*alreadyUsedFields, k) { + continue + } + switch v := v.(type) { + case string: + newSpan.Attributes().PutStr(k, v) + case int: + newSpan.Attributes().PutInt(k, int64(v)) + case int64, int16, int32: + intv := v.(int64) + newSpan.Attributes().PutInt(k, intv) + case float64: + newSpan.Attributes().PutDouble(k, v) + case bool: + newSpan.Attributes().PutBool(k, v) + default: + logger.Warn("Span data type issue", zap.String("trace.trace_id", newSpan.TraceID().String()), zap.String("trace.span_id", newSpan.SpanID().String()), zap.String("key", k)) + } + } + return nil +} diff --git a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go index 40348d2a640a..33d64aa6b340 100644 --- a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go +++ b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" ) @@ -292,3 +293,109 @@ func TestLibhoneyEvent_GetScope(t *testing.T) { }) } } + +func TestToPTraceSpan(t *testing.T) { + now := time.Now() + tests := []struct { + name string + event LibhoneyEvent + want func(ptrace.Span) + wantErr bool + }{ + { + name: "basic span conversion", + event: LibhoneyEvent{ + Time: now.Format(time.RFC3339), + MsgPackTimestamp: &now, + Data: map[string]any{ + "name": "test-span", + "trace.span_id": "1234567890abcdef", + "trace.trace_id": "1234567890abcdef1234567890abcdef", + "duration_ms": 100.0, + "error": true, + "status_message": "error message", + "kind": "server", + "string_attr": "value", + "int_attr": 42, + "bool_attr": true, + }, + Samplerate: 1, + }, + want: func(s ptrace.Span) { + s.SetName("test-span") + s.SetSpanID([8]byte{0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef}) + s.SetTraceID([16]byte{0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef}) + s.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + s.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(100 * time.Millisecond))) + s.Status().SetCode(ptrace.StatusCodeError) + s.Status().SetMessage("error message") + s.SetKind(ptrace.SpanKindServer) + s.Attributes().PutStr("string_attr", "value") + s.Attributes().PutInt("int_attr", 42) + s.Attributes().PutBool("bool_attr", true) + }, + }, + } + + alreadyUsedFields := []string{"name", "trace.span_id", "trace.trace_id", "duration_ms", "status.code", "status.message", "kind"} + testCfg := FieldMapConfig{ + Attributes: AttributesConfig{ + Name: "name", + TraceID: "trace.trace_id", + SpanID: "trace.span_id", + ParentID: "trace.parent_id", + Error: "error", + SpanKind: "kind", + DurationFields: []string{"duration_ms"}, + }, + Resources: ResourcesConfig{ + ServiceName: "service.name", + }, + Scopes: ScopesConfig{ + LibraryName: "library.name", + LibraryVersion: "library.version", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + span := ptrace.NewSpan() + err := tt.event.ToPTraceSpan(&span, &alreadyUsedFields, testCfg, *zap.NewNop()) + + if tt.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + if tt.want != nil { + want := ptrace.NewSpan() + tt.want(want) + + // Check basic fields + assert.Equal(t, want.Name(), span.Name()) + assert.Equal(t, want.SpanID(), span.SpanID()) + assert.Equal(t, want.TraceID(), span.TraceID()) + assert.Equal(t, want.StartTimestamp(), span.StartTimestamp()) + assert.Equal(t, want.EndTimestamp(), span.EndTimestamp()) + assert.Equal(t, want.Kind(), span.Kind()) + + // Check status + assert.Equal(t, want.Status().Code(), span.Status().Code()) + assert.Equal(t, want.Status().Message(), span.Status().Message()) + + // Check attributes + want.Attributes().Range(func(k string, v pcommon.Value) bool { + got, ok := span.Attributes().Get(k) + assert.True(t, ok, "missing attribute %s", k) + assert.Equal(t, v.Type(), got.Type(), "wrong type for attribute %s", k) + assert.Equal(t, v, got, "wrong value for attribute %s", k) + return true + }) + + // Verify no fewer attributes, extras are expected + assert.LessOrEqual(t, want.Attributes().Len(), span.Attributes().Len()) + } + }) + } +} diff --git a/receiver/libhoneyreceiver/internal/parser/parser.go b/receiver/libhoneyreceiver/internal/parser/parser.go index d2818dadd80a..c703d6bad5e7 100644 --- a/receiver/libhoneyreceiver/internal/parser/parser.go +++ b/receiver/libhoneyreceiver/internal/parser/parser.go @@ -4,12 +4,17 @@ package parser // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" import ( + "encoding/hex" "fmt" "net/url" + "slices" + "time" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.16.0" + trc "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" @@ -28,12 +33,15 @@ func GetDatasetFromRequest(path string) (string, error) { } // ToPdata converts a list of LibhoneyEvents to a Pdata Logs object -func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) plog.Logs { +func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) (plog.Logs, ptrace.Traces) { foundServices := libhoneyevent.ServiceHistory{} foundServices.NameCount = make(map[string]int) foundScopes := libhoneyevent.ScopeHistory{} foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) + spanLinks := map[trc.SpanID][]libhoneyevent.LibhoneyEvent{} + spanEvents := map[trc.SpanID][]libhoneyevent.LibhoneyEvent{} + foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) // a list of already seen scopes foundScopes.Scope["libhoney.receiver"] = libhoneyevent.SimpleScope{ ServiceName: dataset, @@ -51,13 +59,21 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.DurationFields...) for _, lhe := range lhes { - action, err := lhe.SignalType() + parent_id, err := lhe.GetParentID(cfg.Attributes.ParentID) if err != nil { - logger.Warn("signal type unclear") + logger.Warn("parent id not found") } + + action := lhe.SignalType(logger) switch action { case "span": - // not implemented + spanService, _ := lhe.GetService(cfg, &foundServices, dataset) + spanScopeKey, _ := lhe.GetScope(cfg, &foundScopes, spanService) // adds a new found scope if needed + newSpan := foundScopes.Scope[spanScopeKey].ScopeSpans.AppendEmpty() + err := lhe.ToPTraceSpan(&newSpan, &alreadyUsedFields, cfg, logger) + if err != nil { + logger.Warn("span could not be converted from libhoney to ptrace", zap.String("span.object", lhe.DebugString())) + } case "log": logService, _ := lhe.GetService(cfg, &foundServices, dataset) logScopeKey, _ := lhe.GetScope(cfg, &foundScopes, logService) // adds a new found scope if needed @@ -66,10 +82,32 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve if err != nil { logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", lhe.DebugString())) } + case "span_event": + spanEvents[parent_id] = append(spanEvents[parent_id], lhe) + case "span_link": + spanLinks[parent_id] = append(spanLinks[parent_id], lhe) + } + } + + start := time.Now() + for _, ss := range foundScopes.Scope { + for i := 0; i < ss.ScopeSpans.Len(); i++ { + sp := ss.ScopeSpans.At(i) + spId := trc.SpanID(sp.SpanID()) + + if speArr, ok := spanEvents[spId]; ok { + addSpanEventsToSpan(sp, speArr, alreadyUsedFields, &logger) + } + + if splArr, ok := spanLinks[spId]; ok { + addSpanLinksToSpan(sp, splArr, alreadyUsedFields, &logger) + } } } + logger.Debug("time to reattach span events and links", zap.Duration("duration", time.Since(start))) resultLogs := plog.NewLogs() + resultTraces := ptrace.NewTraces() for scopeName, ss := range foundScopes.Scope { if ss.ScopeLogs.Len() > 0 { @@ -82,7 +120,125 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve ls.Scope().SetVersion(ss.LibraryVersion) foundScopes.Scope[scopeName].ScopeLogs.MoveAndAppendTo(ls.LogRecords()) } + if ss.ScopeSpans.Len() > 0 { + tr := resultTraces.ResourceSpans().AppendEmpty() + tr.SetSchemaUrl(semconv.SchemaURL) + tr.Resource().Attributes().PutStr(semconv.AttributeServiceName, ss.ServiceName) + + ts := tr.ScopeSpans().AppendEmpty() + ts.Scope().SetName(ss.LibraryName) + ts.Scope().SetVersion(ss.LibraryVersion) + foundScopes.Scope[scopeName].ScopeSpans.MoveAndAppendTo(ts.Spans()) + } } - return resultLogs + return resultLogs, resultTraces +} + +func addSpanEventsToSpan(sp ptrace.Span, events []libhoneyevent.LibhoneyEvent, alreadyUsedFields []string, logger *zap.Logger) { + for _, spe := range events { + newEvent := sp.Events().AppendEmpty() + newEvent.SetTimestamp(pcommon.Timestamp(spe.MsgPackTimestamp.UnixNano())) + newEvent.SetName(spe.Data["name"].(string)) + for lkey, lval := range spe.Data { + if slices.Contains(alreadyUsedFields, lkey) { + continue + } + if lkey == "meta.annotation_type" || lkey == "meta.signal_type" { + continue + } + switch lval := lval.(type) { + case string: + newEvent.Attributes().PutStr(lkey, lval) + case int: + newEvent.Attributes().PutInt(lkey, int64(lval)) + case int64, int16, int32: + intv := lval.(int64) + newEvent.Attributes().PutInt(lkey, intv) + case float64: + newEvent.Attributes().PutDouble(lkey, lval) + case bool: + newEvent.Attributes().PutBool(lkey, lval) + default: + logger.Warn("SpanEvent data type issue", + zap.String("trace.trace_id", sp.TraceID().String()), + zap.String("trace.span_id", sp.SpanID().String()), + zap.String("key", lkey)) + } + } + } +} + +func addSpanLinksToSpan(sp ptrace.Span, links []libhoneyevent.LibhoneyEvent, alreadyUsedFields []string, logger *zap.Logger) { + for _, spl := range links { + newLink := sp.Links().AppendEmpty() + + if linkTraceStr, ok := spl.Data["trace.link.trace_id"]; ok { + tidByteArray, err := hex.DecodeString(linkTraceStr.(string)) + if err != nil { + logger.Warn("span link invalid", + zap.String("missing.attribute", "trace.link.trace_id"), + zap.String("span link contents", spl.DebugString())) + continue + } + if len(tidByteArray) >= 32 { + tidByteArray = tidByteArray[0:32] + } + newLink.SetTraceID(pcommon.TraceID(tidByteArray)) + } else { + logger.Warn("span link missing attributes", + zap.String("missing.attribute", "trace.link.trace_id"), + zap.String("span link contents", spl.DebugString())) + continue + } + + if linkSpanStr, ok := spl.Data["trace.link.span_id"]; ok { + sidByteArray, err := hex.DecodeString(linkSpanStr.(string)) + if err != nil { + logger.Warn("span link invalid", + zap.String("missing.attribute", "trace.link.span_id"), + zap.String("span link contents", spl.DebugString())) + continue + } + if len(sidByteArray) >= 16 { + sidByteArray = sidByteArray[0:16] + } + newLink.SetSpanID(pcommon.SpanID(sidByteArray)) + } else { + logger.Warn("span link missing attributes", + zap.String("missing.attribute", "trace.link.span_id"), + zap.String("span link contents", spl.DebugString())) + continue + } + + for lkey, lval := range spl.Data { + if len(lkey) > 10 && lkey[:11] == "trace.link." { + continue + } + if slices.Contains(alreadyUsedFields, lkey) { + continue + } + if lkey == "meta.annotation_type" || lkey == "meta.signal_type" { + continue + } + switch lval := lval.(type) { + case string: + newLink.Attributes().PutStr(lkey, lval) + case int: + newLink.Attributes().PutInt(lkey, int64(lval)) + case int64, int16, int32: + intv := lval.(int64) + newLink.Attributes().PutInt(lkey, intv) + case float64: + newLink.Attributes().PutDouble(lkey, lval) + case bool: + newLink.Attributes().PutBool(lkey, lval) + default: + logger.Warn("SpanLink data type issue", + zap.String("trace.trace_id", sp.TraceID().String()), + zap.String("trace.span_id", sp.SpanID().String()), + zap.String("key", lkey)) + } + } + } } diff --git a/receiver/libhoneyreceiver/internal/parser/parser_test.go b/receiver/libhoneyreceiver/internal/parser/parser_test.go new file mode 100644 index 000000000000..9b5b25f78283 --- /dev/null +++ b/receiver/libhoneyreceiver/internal/parser/parser_test.go @@ -0,0 +1,261 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package parser + +import ( + "encoding/hex" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" +) + +func TestGetDatasetFromRequest(t *testing.T) { + tests := []struct { + name string + path string + want string + wantErr bool + errContains string + }{ + { + name: "empty path", + path: "", + want: "", + wantErr: true, + errContains: "missing dataset name", + }, + { + name: "simple path", + path: "mydataset", + want: "mydataset", + }, + { + name: "encoded path", + path: "my%20dataset", + want: "my dataset", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetDatasetFromRequest(tt.path) + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errContains) + return + } + require.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestToPdata(t *testing.T) { + logger := zap.NewNop() + now := time.Now() + + // Create test trace and span IDs + traceID := make([]byte, 16) + spanID := make([]byte, 8) + _, err := hex.Decode(traceID, []byte("1234567890abcdef1234567890abcdef")) + require.NoError(t, err) + _, err = hex.Decode(spanID, []byte("1234567890abcdef")) + require.NoError(t, err) + + testCfg := libhoneyevent.FieldMapConfig{ + Attributes: libhoneyevent.AttributesConfig{ + Name: "name", + TraceID: "trace.trace_id", + SpanID: "trace.span_id", + ParentID: "trace.parent_id", + Error: "error", + SpanKind: "kind", + DurationFields: []string{"duration_ms"}, + }, + Resources: libhoneyevent.ResourcesConfig{ + ServiceName: "service.name", + }, + Scopes: libhoneyevent.ScopesConfig{ + LibraryName: "library.name", + LibraryVersion: "library.version", + }, + } + + tests := []struct { + name string + dataset string + events []libhoneyevent.LibhoneyEvent + cfg libhoneyevent.FieldMapConfig + wantLogs int + wantSpans int + }{ + { + name: "empty events", + dataset: "test-dataset", + events: []libhoneyevent.LibhoneyEvent{}, + cfg: testCfg, + wantLogs: 0, + wantSpans: 0, + }, + { + name: "single span", + dataset: "test-dataset", + events: []libhoneyevent.LibhoneyEvent{ + { + Data: map[string]any{ + "meta.signal_type": "trace", + "trace.trace_id": hex.EncodeToString(traceID), + "trace.span_id": hex.EncodeToString(spanID), + "name": "test-span", + "duration_ms": 100.1, + "error": true, + "status_message": "error message", + "kind": "server", + }, + MsgPackTimestamp: &now, + Samplerate: 1, + }, + }, + cfg: testCfg, + wantLogs: 0, + wantSpans: 1, + }, + { + name: "single log", + dataset: "test-dataset", + events: []libhoneyevent.LibhoneyEvent{ + { + Data: map[string]any{ + "meta.signal_type": "log", + "message": "test log message", + }, + MsgPackTimestamp: &now, + }, + }, + cfg: testCfg, + wantLogs: 1, + wantSpans: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logs, traces := ToPdata(tt.dataset, tt.events, tt.cfg, *logger) + assert.Equal(t, tt.wantSpans, traces.SpanCount()) + assert.Equal(t, tt.wantLogs, logs.LogRecordCount()) + }) + } +} + +// Helper function to verify attributes +func verifyAttributes(t *testing.T, expected, actual pcommon.Map) { + expected.Range(func(k string, v pcommon.Value) bool { + got, ok := actual.Get(k) + assert.True(t, ok, "missing attribute %s", k) + assert.Equal(t, v.Type(), got.Type(), "wrong type for attribute %s", k) + assert.Equal(t, v, got, "wrong value for attribute %s", k) + return true + }) +} + +func TestAddSpanEventsToSpan(t *testing.T) { + logger := zap.NewNop() + now := time.Now() + s := ptrace.NewSpan() + s.SetName("test-span") + s.SetSpanID([8]byte{0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef}) + s.SetTraceID([16]byte{0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef}) + s.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + s.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(100 * time.Millisecond))) + s.Status().SetCode(ptrace.StatusCodeError) + s.Status().SetMessage("error message") + s.SetKind(ptrace.SpanKindServer) + s.Attributes().PutStr("string_attr", "value") + s.Attributes().PutInt("int_attr", 42) + s.Attributes().PutBool("bool_attr", true) + + events := []libhoneyevent.LibhoneyEvent{ + { + Data: map[string]any{ + "name": "event1", + "string": "value", + "int": 42, + "float": 3.14, + "bool": true, + }, + MsgPackTimestamp: &now, + }, + } + + addSpanEventsToSpan(s, events, []string{}, logger) + + assert.Equal(t, 1, s.Events().Len()) + event := s.Events().At(0) + assert.Equal(t, "event1", event.Name()) + assert.Equal(t, pcommon.Timestamp(now.UnixNano()), event.Timestamp()) + + attrs := event.Attributes() + verifyAttributes(t, attrs, event.Attributes()) +} + +func TestAddSpanLinksToSpan(t *testing.T) { + logger := zap.NewNop() + + now := time.Now() + s := ptrace.NewSpan() + s.SetName("test-span") + s.SetSpanID([8]byte{0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef}) + s.SetTraceID([16]byte{0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef}) + s.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + s.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(100 * time.Millisecond))) + s.Status().SetCode(ptrace.StatusCodeError) + s.Status().SetMessage("error message") + s.SetKind(ptrace.SpanKindServer) + s.Attributes().PutStr("string_attr", "value") + s.Attributes().PutInt("int_attr", 42) + s.Attributes().PutBool("bool_attr", true) + + // Create test trace and span IDs for the link + linkTraceIDHex := make([]byte, 16) + linkSpanIDHex := make([]byte, 8) + _, err := hex.Decode(linkTraceIDHex, []byte("abcdef1234567890abcdef1234567890")) + require.NoError(t, err) + _, err = hex.Decode(linkSpanIDHex, []byte("abcdef1234567890")) + require.NoError(t, err) + linkTraceID := pcommon.TraceID(linkTraceIDHex) + linkSpanID := pcommon.SpanID(linkSpanIDHex) + + links := []libhoneyevent.LibhoneyEvent{ + { + Data: map[string]any{ + "trace.link.trace_id": hex.EncodeToString(linkTraceIDHex), + "trace.link.span_id": hex.EncodeToString(linkSpanIDHex), + "trace.trace_id": "1234567890abcdef1234567890abcdef", + "trace.span_id": "1234567890abcdef", + "attribute1": "stringval", + "attribute2": 4, + }, + }, + } + + addSpanLinksToSpan(s, links, []string{}, logger) + + assert.Equal(t, 1, s.Links().Len()) + link := s.Links().At(0) + + // Verify trace and span IDs + assert.Equal(t, linkTraceID, link.TraceID()) + assert.Equal(t, linkSpanID, link.SpanID()) + + // Verify attributes + attrs := link.Attributes() + verifyAttributes(t, attrs, link.Attributes()) +} diff --git a/receiver/libhoneyreceiver/receiver.go b/receiver/libhoneyreceiver/receiver.go index 84ad68e4638a..8c1c27e33822 100644 --- a/receiver/libhoneyreceiver/receiver.go +++ b/receiver/libhoneyreceiver/receiver.go @@ -231,7 +231,7 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque } } - otlpLogs := parser.ToPdata(dataset, libhoneyevents, r.cfg.FieldMapConfig, *r.settings.Logger) + otlpLogs, otlpTraces := parser.ToPdata(dataset, libhoneyevents, r.cfg.FieldMapConfig, *r.settings.Logger) numLogs := otlpLogs.LogRecordCount() if numLogs > 0 { @@ -240,6 +240,13 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque r.obsreport.EndLogsOp(ctx, "protobuf", numLogs, err) } + numTraces := otlpTraces.SpanCount() + if numTraces > 0 { + ctx := r.obsreport.StartTracesOp(context.Background()) + err = r.nextTraces.ConsumeTraces(ctx, otlpTraces) + r.obsreport.EndTracesOp(ctx, "protobuf", numTraces, err) + } + if err != nil { errorutil.HTTPError(resp, err) return