Skip to content

Commit

Permalink
second half of trace implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mterhar committed Dec 19, 2024
1 parent 5d8a956 commit 933fa97
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 3 deletions.
2 changes: 1 addition & 1 deletion receiver/libhoneyreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
go.opentelemetry.io/collector/receiver v0.116.0
go.opentelemetry.io/collector/receiver/receivertest v0.116.0
go.opentelemetry.io/collector/semconv v0.116.0
go.opentelemetry.io/otel/trace v1.32.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,25 @@ func (l *LibhoneyEvent) DebugString() string {

// SignalType returns the type of signal this event represents. Only log is implemented for now.
func (l *LibhoneyEvent) SignalType() (string, error) {
return "log", nil
if sig, ok := l.Data["meta.signal_type"]; ok {
switch sig {
case "trace":
if atype, ok := l.Data["meta.annotation_type"]; ok {
if atype == "span_event" {
return "span_event", nil
} else if atype == "link" {
return "span_link", nil
}
return "span", errors.New("invalid annotation type, but probably a span")
}
return "span", nil
case "log":
return "log", nil
default:
return "log", errors.New("invalid meta.signal_type")
}
}
return "log", errors.New("missing meta.signal_type and meta.annotation_type")
}

// GetService returns the service name from the event or the dataset name if no service name is found.
Expand Down Expand Up @@ -235,6 +253,24 @@ func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *
return nil
}

// GetParentID returns the parent id from the event or an error if it's not found
func (l *LibhoneyEvent) GetParentID(fieldName string) (trc.SpanID, error) {
if pid, ok := l.Data[fieldName]; ok {
pid := strings.ReplaceAll(pid.(string), "-", "")
pidByteArray, err := hex.DecodeString(pid)
if err == nil {
if len(pidByteArray) == 32 {
pidByteArray = pidByteArray[8:24]
} else if len(pidByteArray) >= 16 {
pidByteArray = pidByteArray[0:16]
}
return trc.SpanID(pidByteArray), nil
}
return trc.SpanID{}, errors.New("parent id is not a valid span id")
}
return trc.SpanID{}, errors.New("parent id not found")
}

// ToPTraceSpan converts a LibhoneyEvent to a Pdata Span
func (l *LibhoneyEvent) ToPTraceSpan(newSpan *ptrace.Span, alreadyUsedFields *[]string, cfg FieldMapConfig, logger zap.Logger) error {
time_ns := l.MsgPackTimestamp.UnixNano()
Expand Down
126 changes: 125 additions & 1 deletion receiver/libhoneyreceiver/internal/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
package parser // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser"

import (
"encoding/hex"
"fmt"
"net/url"
"slices"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.16.0"
trc "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"
Expand All @@ -34,6 +39,9 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
foundScopes := libhoneyevent.ScopeHistory{}
foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope)

spanLinks := map[trc.SpanID][]libhoneyevent.LibhoneyEvent{}
spanEvents := map[trc.SpanID][]libhoneyevent.LibhoneyEvent{}

foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) // a list of already seen scopes
foundScopes.Scope["libhoney.receiver"] = libhoneyevent.SimpleScope{
ServiceName: dataset,
Expand All @@ -51,13 +59,24 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.DurationFields...)

for _, lhe := range lhes {
parent_id, err := lhe.GetParentID(cfg.Attributes.ParentID)
if err != nil {
logger.Warn("parent id not found")
}

action, err := lhe.SignalType()
if err != nil {
logger.Warn("signal type unclear")
}
switch action {
case "span":
// not implemented
spanService, _ := lhe.GetService(cfg, &foundServices, dataset)
spanScopeKey, _ := lhe.GetScope(cfg, &foundScopes, spanService) // adds a new found scope if needed
newSpan := foundScopes.Scope[spanScopeKey].ScopeSpans.AppendEmpty()
err := lhe.ToPTraceSpan(&newSpan, &alreadyUsedFields, cfg, logger)
if err != nil {
logger.Warn("span could not be converted from libhoney to ptrace", zap.String("span.object", lhe.DebugString()))
}
case "log":
logService, _ := lhe.GetService(cfg, &foundServices, dataset)
logScopeKey, _ := lhe.GetScope(cfg, &foundScopes, logService) // adds a new found scope if needed
Expand All @@ -66,8 +85,113 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
if err != nil {
logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", lhe.DebugString()))
}
case "span_event":
spanEvents[parent_id] = append(spanEvents[parent_id], lhe)
case "span_link":
spanLinks[parent_id] = append(spanLinks[parent_id], lhe)
}
}

start := time.Now()
for _, ss := range foundScopes.Scope {
for i := 0; i < ss.ScopeSpans.Len(); i++ {
sp := ss.ScopeSpans.At(i)

spId := trc.SpanID(sp.SpanID())
if speArr, ok := spanEvents[spId]; ok {
// call to new function in libhoneyevent.
for _, spe := range speArr {
newEvent := sp.Events().AppendEmpty()
newEvent.SetTimestamp(pcommon.Timestamp(spe.MsgPackTimestamp.UnixNano()))
newEvent.SetName(spe.Data["name"].(string))
for lkey, lval := range spe.Data {
if slices.Contains(alreadyUsedFields, lkey) {
continue
}
if lkey == "meta.annotation_type" || lkey == "meta.signal_type" {
continue
}
switch lval := lval.(type) {
case string:
newEvent.Attributes().PutStr(lkey, lval)
case int:
newEvent.Attributes().PutInt(lkey, int64(lval))
case int64, int16, int32:
intv := lval.(int64)
newEvent.Attributes().PutInt(lkey, intv)
case float64:
newEvent.Attributes().PutDouble(lkey, lval)
case bool:
newEvent.Attributes().PutBool(lkey, lval)
default:
logger.Warn("SpanEvent data type issue", zap.String("trace.trace_id", sp.TraceID().String()), zap.String("trace.span_id", sp.SpanID().String()), zap.String("key", lkey))
}
}
}
}
if splArr, ok := spanLinks[spId]; ok {
for _, spl := range splArr {
newLink := sp.Links().AppendEmpty()

if linkTraceStr, ok := spl.Data["trace.link.trace_id"]; ok {
tidByteArray, err := hex.DecodeString(linkTraceStr.(string))
if err != nil {
logger.Warn("span link invalid", zap.String("missing.attribute", "trace.link.trace_id"), zap.String("span link contents", spl.DebugString()))
continue
}
if len(tidByteArray) >= 32 {
tidByteArray = tidByteArray[0:32]
}
newLink.SetTraceID(pcommon.TraceID(tidByteArray))
} else {
logger.Warn("span link missing attributes", zap.String("missing.attribute", "trace.link.trace_id"), zap.String("span link contents", spl.DebugString()))
continue
}
if linkSpanStr, ok := spl.Data["trace.link.span_id"]; ok {
sidByteArray, err := hex.DecodeString(linkSpanStr.(string))
if err != nil {
logger.Warn("span link invalid", zap.String("missing.attribute", "trace.link.span_id"), zap.String("span link contents", spl.DebugString()))
continue
}
if len(sidByteArray) >= 16 {
sidByteArray = sidByteArray[0:16]
}
newLink.SetSpanID(pcommon.SpanID(sidByteArray))
} else {
logger.Warn("span link missing attributes", zap.String("missing.attribute", "trace.link.span_id"), zap.String("span link contents", spl.DebugString()))
continue
}
for lkey, lval := range spl.Data {
if len(lkey) > 10 && lkey[:11] == "trace.link." {
continue
}
if slices.Contains(alreadyUsedFields, lkey) {
continue
}
if lkey == "meta.annotation_type" || lkey == "meta.signal_type" {
continue
}
switch lval := lval.(type) {
case string:
newLink.Attributes().PutStr(lkey, lval)
case int:
newLink.Attributes().PutInt(lkey, int64(lval))
case int64, int16, int32:
intv := lval.(int64)
newLink.Attributes().PutInt(lkey, intv)
case float64:
newLink.Attributes().PutDouble(lkey, lval)
case bool:
newLink.Attributes().PutBool(lkey, lval)
default:
logger.Warn("SpanLink data type issue", zap.String("trace.trace_id", sp.TraceID().String()), zap.String("trace.span_id", sp.SpanID().String()), zap.String("key", lkey))
}
}
}
}
}
}
logger.Debug("time to reattach span events and links", zap.Duration("duration", time.Since(start)))

resultLogs := plog.NewLogs()
resultTraces := ptrace.NewTraces()
Expand Down

0 comments on commit 933fa97

Please sign in to comment.