diff --git a/.chloggen/logs-for-libhoneyreceiver.yaml b/.chloggen/logs-for-libhoneyreceiver.yaml new file mode 100644 index 000000000000..dc90a4fbe50b --- /dev/null +++ b/.chloggen/logs-for-libhoneyreceiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: libhoneyreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement log signal for libhoney receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36693] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/pkg/ottl/context_inferrer.go b/pkg/ottl/context_inferrer.go index da4ade783278..714100ac4d57 100644 --- a/pkg/ottl/context_inferrer.go +++ b/pkg/ottl/context_inferrer.go @@ -3,12 +3,16 @@ package ottl // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" -import "math" +import ( + "cmp" + "math" + "slices" +) var defaultContextInferPriority = []string{ "log", - "metric", "datapoint", + "metric", "spanevent", "span", "resource", @@ -16,62 +20,217 @@ var defaultContextInferPriority = []string{ "instrumentation_scope", } -// contextInferrer is an interface used to infer the OTTL context from statements paths. +// contextInferrer is an interface used to infer the OTTL context from statements. type contextInferrer interface { - // infer returns the OTTL context inferred from the given statements paths. + // infer returns the OTTL context inferred from the given statements. infer(statements []string) (string, error) } type priorityContextInferrer struct { - contextPriority map[string]int + contextPriority map[string]int + contextCandidate map[string]*priorityContextInferrerCandidate +} + +type priorityContextInferrerCandidate struct { + hasEnumSymbol func(enum *EnumSymbol) bool + hasFunctionName func(name string) bool + getLowerContexts func(context string) []string +} + +type priorityContextInferrerOption func(*priorityContextInferrer) + +// newPriorityContextInferrer creates a new priority-based context inferrer. To infer the context, +// it uses a slice of priorities (withContextInferrerPriorities) and a set of hints extracted from +// the parsed statements. +// +// To be eligible, a context must support all functions and enums symbols present on the statements. +// If the path context with the highest priority does not meet this requirement, it falls back to its +// lower contexts, testing them with the same logic and choosing the first one that meets all requirements. +// +// If non-prioritized contexts are found on the statements, they get assigned the lowest possible priority, +// and are only selected if no other prioritized context is found. +func newPriorityContextInferrer(contextsCandidate map[string]*priorityContextInferrerCandidate, options ...priorityContextInferrerOption) contextInferrer { + c := &priorityContextInferrer{ + contextCandidate: contextsCandidate, + } + for _, opt := range options { + opt(c) + } + if len(c.contextPriority) == 0 { + withContextInferrerPriorities(defaultContextInferPriority)(c) + } + return c +} + +// withContextInferrerPriorities sets the contexts candidates priorities. The lower the +// context position is in the array, the more priority it will have over other items. +func withContextInferrerPriorities(priorities []string) priorityContextInferrerOption { + return func(c *priorityContextInferrer) { + contextPriority := map[string]int{} + for pri, context := range priorities { + contextPriority[context] = pri + } + c.contextPriority = contextPriority + } } func (s *priorityContextInferrer) infer(statements []string) (string, error) { + requiredFunctions := map[string]struct{}{} + requiredEnums := map[enumSymbol]struct{}{} + var inferredContext string var inferredContextPriority int - for _, statement := range statements { parsed, err := parseStatement(statement) if err != nil { - return inferredContext, err + return "", err } - for _, p := range getParsedStatementPaths(parsed) { - pathContextPriority, ok := s.contextPriority[p.Context] + statementPaths, statementFunctions, statementEnums := s.getParsedStatementHints(parsed) + for _, p := range statementPaths { + candidate := p.Context + candidatePriority, ok := s.contextPriority[candidate] if !ok { - // Lowest priority - pathContextPriority = math.MaxInt + candidatePriority = math.MaxInt } - - if inferredContext == "" || pathContextPriority < inferredContextPriority { - inferredContext = p.Context - inferredContextPriority = pathContextPriority + if inferredContext == "" || candidatePriority < inferredContextPriority { + inferredContext = candidate + inferredContextPriority = candidatePriority } } + for function := range statementFunctions { + requiredFunctions[function] = struct{}{} + } + for enum := range statementEnums { + requiredEnums[enum] = struct{}{} + } + } + // No inferred context or nothing left to verify. + if inferredContext == "" || (len(requiredFunctions) == 0 && len(requiredEnums) == 0) { + return inferredContext, nil + } + ok := s.validateContextCandidate(inferredContext, requiredFunctions, requiredEnums) + if ok { + return inferredContext, nil + } + return s.inferFromLowerContexts(inferredContext, requiredFunctions, requiredEnums), nil +} + +// validateContextCandidate checks if the given context candidate has all required functions names +// and enums symbols. The functions arity are not verified. +func (s *priorityContextInferrer) validateContextCandidate( + context string, + requiredFunctions map[string]struct{}, + requiredEnums map[enumSymbol]struct{}, +) bool { + candidate, ok := s.contextCandidate[context] + if !ok { + return false + } + if len(requiredFunctions) == 0 && len(requiredEnums) == 0 { + return true + } + for function := range requiredFunctions { + if !candidate.hasFunctionName(function) { + return false + } + } + for enum := range requiredEnums { + if !candidate.hasEnumSymbol((*EnumSymbol)(&enum)) { + return false + } } + return true +} + +// inferFromLowerContexts returns the first lower context that supports all required functions +// and enum symbols used on the statements. +// If no lower context meets the requirements, or if the context candidate is unknown, it +// returns an empty string. +func (s *priorityContextInferrer) inferFromLowerContexts( + context string, + requiredFunctions map[string]struct{}, + requiredEnums map[enumSymbol]struct{}, +) string { + inferredContextCandidate, ok := s.contextCandidate[context] + if !ok { + return "" + } + + lowerContextCandidates := inferredContextCandidate.getLowerContexts(context) + if len(lowerContextCandidates) == 0 { + return "" + } + + s.sortContextCandidates(lowerContextCandidates) + for _, lowerCandidate := range lowerContextCandidates { + ok = s.validateContextCandidate(lowerCandidate, requiredFunctions, requiredEnums) + if ok { + return lowerCandidate + } + } + return "" +} + +// sortContextCandidates sorts the slice candidates using the priorityContextInferrer.contextsPriority order. +func (s *priorityContextInferrer) sortContextCandidates(candidates []string) { + slices.SortFunc(candidates, func(l, r string) int { + lp, ok := s.contextPriority[l] + if !ok { + lp = math.MaxInt + } + rp, ok := s.contextPriority[r] + if !ok { + rp = math.MaxInt + } + return cmp.Compare(lp, rp) + }) +} - return inferredContext, nil +// getParsedStatementHints extracts all path, function names (editor and converter), and enumSymbol +// from the given parsed statements. These values are used by the context inferrer as hints to +// select a context in which the function/enum are supported. +func (s *priorityContextInferrer) getParsedStatementHints(parsed *parsedStatement) ([]path, map[string]struct{}, map[enumSymbol]struct{}) { + visitor := newGrammarContextInferrerVisitor() + parsed.Editor.accept(&visitor) + if parsed.WhereClause != nil { + parsed.WhereClause.accept(&visitor) + } + return visitor.paths, visitor.functions, visitor.enumsSymbols } -// defaultPriorityContextInferrer is like newPriorityContextInferrer, but using the default -// context priorities and ignoring unknown/non-prioritized contexts. -func defaultPriorityContextInferrer() contextInferrer { - return newPriorityContextInferrer(defaultContextInferPriority) +// priorityContextInferrerHintsVisitor is a grammarVisitor implementation that collects +// all path, function names (converter.Function and editor.Function), and enumSymbol. +type priorityContextInferrerHintsVisitor struct { + paths []path + functions map[string]struct{} + enumsSymbols map[enumSymbol]struct{} } -// newPriorityContextInferrer creates a new priority-based context inferrer. -// To infer the context, it compares all [ottl.Path.Context] values, prioritizing them based -// on the provide contextsPriority argument, the lower the context position is in the array, -// the more priority it will have over other items. -// If unknown/non-prioritized contexts are found on the statements, they can be either ignored -// or considered when no other prioritized context is found. To skip unknown contexts, the -// ignoreUnknownContext argument must be set to false. -func newPriorityContextInferrer(contextsPriority []string) contextInferrer { - contextPriority := make(map[string]int, len(contextsPriority)) - for i, ctx := range contextsPriority { - contextPriority[ctx] = i +func newGrammarContextInferrerVisitor() priorityContextInferrerHintsVisitor { + return priorityContextInferrerHintsVisitor{ + paths: []path{}, + functions: make(map[string]struct{}), + enumsSymbols: make(map[enumSymbol]struct{}), } - return &priorityContextInferrer{ - contextPriority: contextPriority, +} + +func (v *priorityContextInferrerHintsVisitor) visitMathExprLiteral(_ *mathExprLiteral) {} + +func (v *priorityContextInferrerHintsVisitor) visitEditor(e *editor) { + v.functions[e.Function] = struct{}{} +} + +func (v *priorityContextInferrerHintsVisitor) visitConverter(c *converter) { + v.functions[c.Function] = struct{}{} +} + +func (v *priorityContextInferrerHintsVisitor) visitValue(va *value) { + if va.Enum != nil { + v.enumsSymbols[*va.Enum] = struct{}{} } } + +func (v *priorityContextInferrerHintsVisitor) visitPath(value *path) { + v.paths = append(v.paths, *value) +} diff --git a/pkg/ottl/context_inferrer_test.go b/pkg/ottl/context_inferrer_test.go index 4d4455dd0dcf..9cec76503451 100644 --- a/pkg/ottl/context_inferrer_test.go +++ b/pkg/ottl/context_inferrer_test.go @@ -10,22 +10,55 @@ import ( "github.com/stretchr/testify/require" ) +var defaultDummyPriorityContextInferrerCandidate = &priorityContextInferrerCandidate{ + hasFunctionName: func(name string) bool { + return true + }, + hasEnumSymbol: func(enum *EnumSymbol) bool { + return true + }, + getLowerContexts: func(context string) []string { + return nil + }, +} + +func newDummyPriorityContextInferrerCandidate(hasFunctionName, hasEnumSymbol bool, lowerContexts []string) *priorityContextInferrerCandidate { + return &priorityContextInferrerCandidate{ + hasFunctionName: func(_ string) bool { + return hasFunctionName + }, + hasEnumSymbol: func(_ *EnumSymbol) bool { + return hasEnumSymbol + }, + getLowerContexts: func(_ string) []string { + return lowerContexts + }, + } +} + func Test_NewPriorityContextInferrer_Infer(t *testing.T) { tests := []struct { name string priority []string + candidates map[string]*priorityContextInferrerCandidate statements []string expected string }{ { - name: "with priority and contexts", - priority: []string{"spanevent", "span", "resource"}, + name: "with priority and statement context", + priority: []string{"spanevent", "span", "resource"}, + candidates: map[string]*priorityContextInferrerCandidate{ + "spanevent": defaultDummyPriorityContextInferrerCandidate, + }, statements: []string{"set(span.foo, resource.value) where spanevent.bar == true"}, expected: "spanevent", }, { - name: "with multiple statements", + name: "with multiple statements and contexts", priority: []string{"spanevent", "span", "resource"}, + candidates: map[string]*priorityContextInferrerCandidate{ + "spanevent": defaultDummyPriorityContextInferrerCandidate, + }, statements: []string{ "set(resource.foo, resource.value) where span.bar == true", "set(resource.foo, resource.value) where spanevent.bar == true", @@ -33,27 +66,84 @@ func Test_NewPriorityContextInferrer_Infer(t *testing.T) { expected: "spanevent", }, { - name: "with no context", + name: "with no statements context", priority: []string{"log", "resource"}, + candidates: map[string]*priorityContextInferrerCandidate{}, statements: []string{"set(foo, true) where bar == true"}, expected: "", }, { - name: "with empty priority", + name: "with empty priority list", + candidates: map[string]*priorityContextInferrerCandidate{ + "foo": defaultDummyPriorityContextInferrerCandidate, + }, statements: []string{"set(foo.name, true) where bar.name == true"}, expected: "foo", }, { - name: "with unknown context", - priority: []string{"foo", "bar"}, + name: "with non-prioritized statement context", + priority: []string{"foo", "bar"}, + candidates: map[string]*priorityContextInferrerCandidate{ + "span": defaultDummyPriorityContextInferrerCandidate, + }, statements: []string{"set(span.foo, true) where span.bar == true"}, expected: "span", }, + { + name: "inferred path context with missing function", + priority: []string{"foo", "datapoint", "metric"}, + statements: []string{`set(metric.is_foo, true) where metric.name == "foo"`}, + candidates: map[string]*priorityContextInferrerCandidate{ + "metric": newDummyPriorityContextInferrerCandidate(false, true, []string{"foo", "datapoint"}), + "foo": newDummyPriorityContextInferrerCandidate(false, true, []string{}), + "datapoint": newDummyPriorityContextInferrerCandidate(true, true, []string{}), + }, + expected: "datapoint", + }, + { + name: "inferred path context with missing function and no qualified lower context", + priority: []string{"datapoint", "metric"}, + statements: []string{`set(metric.is_foo, true) where metric.name == "foo"`}, + candidates: map[string]*priorityContextInferrerCandidate{ + "metric": newDummyPriorityContextInferrerCandidate(false, false, []string{"datapoint"}), + "datapoint": newDummyPriorityContextInferrerCandidate(false, false, []string{}), + }, + expected: "", + }, + { + name: "inferred path context with missing function and no lower context", + priority: []string{"datapoint", "metric"}, + statements: []string{`set(metric.is_foo, true) where metric.name == "foo"`}, + candidates: map[string]*priorityContextInferrerCandidate{ + "metric": newDummyPriorityContextInferrerCandidate(false, true, []string{}), + }, + expected: "", + }, + { + name: "inferred path context with missing enum", + priority: []string{"foo", "bar"}, + statements: []string{`set(foo.name, FOO) where IsFoo() == true`}, + candidates: map[string]*priorityContextInferrerCandidate{ + "foo": newDummyPriorityContextInferrerCandidate(true, false, []string{"foo", "bar"}), + "bar": newDummyPriorityContextInferrerCandidate(true, true, []string{}), + }, + expected: "bar", + }, + { + name: "unknown context candidate inferred from paths", + priority: []string{"unknown"}, + statements: []string{`set(unknown.count, 0)`}, + candidates: map[string]*priorityContextInferrerCandidate{}, + expected: "", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - inferrer := newPriorityContextInferrer(tt.priority) + inferrer := newPriorityContextInferrer( + tt.candidates, + withContextInferrerPriorities(tt.priority), + ) inferredContext, err := inferrer.infer(tt.statements) require.NoError(t, err) assert.Equal(t, tt.expected, inferredContext) @@ -62,7 +152,7 @@ func Test_NewPriorityContextInferrer_Infer(t *testing.T) { } func Test_NewPriorityContextInferrer_InvalidStatement(t *testing.T) { - inferrer := newPriorityContextInferrer([]string{"foo"}) + inferrer := newPriorityContextInferrer(map[string]*priorityContextInferrerCandidate{}) statements := []string{"set(foo.field,"} _, err := inferrer.infer(statements) require.ErrorContains(t, err, "unexpected token") @@ -71,8 +161,8 @@ func Test_NewPriorityContextInferrer_InvalidStatement(t *testing.T) { func Test_DefaultPriorityContextInferrer(t *testing.T) { expectedPriority := []string{ "log", - "metric", "datapoint", + "metric", "spanevent", "span", "resource", @@ -80,7 +170,7 @@ func Test_DefaultPriorityContextInferrer(t *testing.T) { "instrumentation_scope", } - inferrer := defaultPriorityContextInferrer().(*priorityContextInferrer) + inferrer := newPriorityContextInferrer(map[string]*priorityContextInferrerCandidate{}).(*priorityContextInferrer) require.NotNil(t, inferrer) for pri, ctx := range expectedPriority { diff --git a/pkg/ottl/grammar.go b/pkg/ottl/grammar.go index a1e5eb53a81d..b7d6743f4afa 100644 --- a/pkg/ottl/grammar.go +++ b/pkg/ottl/grammar.go @@ -213,6 +213,7 @@ type converter struct { } func (c *converter) accept(v grammarVisitor) { + v.visitConverter(c) if c.Arguments != nil { for _, a := range c.Arguments { a.accept(v) @@ -525,6 +526,7 @@ func (e *grammarCustomError) Unwrap() []error { type grammarVisitor interface { visitPath(v *path) visitEditor(v *editor) + visitConverter(v *converter) visitValue(v *value) visitMathExprLiteral(v *mathExprLiteral) } @@ -549,6 +551,8 @@ func (g *grammarCustomErrorsVisitor) visitPath(_ *path) {} func (g *grammarCustomErrorsVisitor) visitValue(_ *value) {} +func (g *grammarCustomErrorsVisitor) visitConverter(_ *converter) {} + func (g *grammarCustomErrorsVisitor) visitEditor(v *editor) { if v.Keys != nil { g.add(fmt.Errorf("only paths and converters may be indexed, not editors, but got %s%s", v.Function, buildOriginalKeysText(v.Keys))) diff --git a/pkg/ottl/parser_collection.go b/pkg/ottl/parser_collection.go index 72d0d6abb3f0..c257e8eb198c 100644 --- a/pkg/ottl/parser_collection.go +++ b/pkg/ottl/parser_collection.go @@ -131,11 +131,13 @@ type parserCollectionParser struct { // // Experimental: *NOTE* this API is subject to change or removal in the future. type ParserCollection[R any] struct { - contextParsers map[string]*parserCollectionParser - contextInferrer contextInferrer - modifiedStatementLogging bool - Settings component.TelemetrySettings - ErrorMode ErrorMode + contextParsers map[string]*parserCollectionParser + contextInferrer contextInferrer + contextInferrerCandidates map[string]*priorityContextInferrerCandidate + candidatesLowerContexts map[string][]string + modifiedStatementLogging bool + Settings component.TelemetrySettings + ErrorMode ErrorMode } // ParserCollectionOption is a configurable ParserCollection option. @@ -150,10 +152,13 @@ func NewParserCollection[R any]( settings component.TelemetrySettings, options ...ParserCollectionOption[R], ) (*ParserCollection[R], error) { + contextInferrerCandidates := map[string]*priorityContextInferrerCandidate{} pc := &ParserCollection[R]{ - Settings: settings, - contextParsers: map[string]*parserCollectionParser{}, - contextInferrer: defaultPriorityContextInferrer(), + Settings: settings, + contextParsers: map[string]*parserCollectionParser{}, + contextInferrer: newPriorityContextInferrer(contextInferrerCandidates), + contextInferrerCandidates: contextInferrerCandidates, + candidatesLowerContexts: map[string][]string{}, } for _, op := range options { @@ -211,10 +216,32 @@ func WithParserCollectionContext[K any, R any]( ottlParser: newParserWrapper[K](parser), statementsConverter: newStatementsConverterWrapper(converter), } + + for lowerContext := range parser.pathContextNames { + if lowerContext != context { + mp.candidatesLowerContexts[lowerContext] = append(mp.candidatesLowerContexts[lowerContext], context) + } + } + + mp.contextInferrerCandidates[context] = &priorityContextInferrerCandidate{ + hasEnumSymbol: func(enum *EnumSymbol) bool { + _, err := parser.enumParser(enum) + return err == nil + }, + hasFunctionName: func(name string) bool { + _, ok := parser.functions[name] + return ok + }, + getLowerContexts: mp.getLowerContexts, + } return nil } } +func (pc *ParserCollection[R]) getLowerContexts(context string) []string { + return pc.candidatesLowerContexts[context] +} + // WithParserCollectionErrorMode has no effect on the ParserCollection, but might be used // by the ParsedStatementConverter functions to handle/create StatementSequence. // @@ -255,7 +282,12 @@ func (pc *ParserCollection[R]) ParseStatements(statements StatementsGetter) (R, } if inferredContext == "" { - return *new(R), fmt.Errorf("unable to infer context from statements [%v], path's first segment must be a valid context name", statementsValues) + return *new(R), fmt.Errorf("unable to infer context from statements %+q, path's first segment must be a valid context name: %+q", statementsValues, pc.supportedContextNames()) + } + + _, ok := pc.contextParsers[inferredContext] + if !ok { + return *new(R), fmt.Errorf(`context "%s" inferred from the statements %+q is not a supported context: %+q`, inferredContext, statementsValues, pc.supportedContextNames()) } return pc.ParseStatementsWithContext(inferredContext, statements, false) @@ -332,3 +364,11 @@ func (pc *ParserCollection[R]) logModifiedStatements(originalStatements, modifie pc.Settings.Logger.Info("one or more statements were modified to include their paths context, please rewrite them accordingly", zap.Dict("statements", fields...)) } } + +func (pc *ParserCollection[R]) supportedContextNames() []string { + contextsNames := make([]string, 0, len(pc.contextParsers)) + for k := range pc.contextParsers { + contextsNames = append(contextsNames, k) + } + return contextsNames +} diff --git a/pkg/ottl/parser_collection_test.go b/pkg/ottl/parser_collection_test.go index 841f3a5fab60..df33f4ba05c3 100644 --- a/pkg/ottl/parser_collection_test.go +++ b/pkg/ottl/parser_collection_test.go @@ -93,6 +93,35 @@ func Test_WithParserCollectionContext_UnsupportedContext(t *testing.T) { require.ErrorContains(t, err, `context "bar" must be a valid "*ottl.Parser[interface {}]" path context name`) } +func Test_WithParserCollectionContext_contextInferrerCandidates(t *testing.T) { + pc, err := NewParserCollection[any](component.TelemetrySettings{}, + WithParserCollectionContext("foo", mockParser(t, WithPathContextNames[any]([]string{"foo", "bar"})), newNopParsedStatementConverter[any]()), + WithParserCollectionContext("bar", mockParser(t, WithPathContextNames[any]([]string{"bar"})), newNopParsedStatementConverter[any]()), + ) + require.NoError(t, err) + require.NotNil(t, pc.contextInferrer) + require.Contains(t, pc.contextInferrerCandidates, "foo") + + validEnumSymbol := EnumSymbol("TEST_ENUM") + invalidEnumSymbol := EnumSymbol("DUMMY") + + fooCandidate := pc.contextInferrerCandidates["foo"] + assert.NotNil(t, fooCandidate) + assert.True(t, fooCandidate.hasFunctionName("set")) + assert.False(t, fooCandidate.hasFunctionName("dummy")) + assert.True(t, fooCandidate.hasEnumSymbol(&validEnumSymbol)) + assert.False(t, fooCandidate.hasEnumSymbol(&invalidEnumSymbol)) + assert.Nil(t, fooCandidate.getLowerContexts("foo")) + + barCandidate := pc.contextInferrerCandidates["bar"] + assert.NotNil(t, barCandidate) + assert.True(t, barCandidate.hasFunctionName("set")) + assert.False(t, barCandidate.hasFunctionName("dummy")) + assert.True(t, barCandidate.hasEnumSymbol(&validEnumSymbol)) + assert.False(t, barCandidate.hasEnumSymbol(&invalidEnumSymbol)) + assert.Equal(t, []string{"foo"}, barCandidate.getLowerContexts("bar")) +} + func Test_WithParserCollectionErrorMode(t *testing.T) { pc, err := NewParserCollection[any]( componenttest.NewNopTelemetrySettings(), @@ -253,14 +282,18 @@ func Test_ParseStatements_ContextInferenceError(t *testing.T) { } func Test_ParseStatements_UnknownContextError(t *testing.T) { - pc, err := NewParserCollection[any](component.TelemetrySettings{}) + pc, err := NewParserCollection[any](component.TelemetrySettings{}, + WithParserCollectionContext("bar", mockParser(t, WithPathContextNames[any]([]string{"bar"})), newNopParsedStatementConverter[any]()), + WithParserCollectionContext("te", mockParser(t, WithPathContextNames[any]([]string{"te"})), newNopParsedStatementConverter[any]()), + ) require.NoError(t, err) pc.contextInferrer = &mockStaticContextInferrer{"foo"} statements := mockStatementsGetter{values: []string{`set(foo.attributes["bar"], "foo")`}} _, err = pc.ParseStatements(statements) - assert.ErrorContains(t, err, `unknown context "foo"`) + assert.ErrorContains(t, err, `context "foo" inferred from the statements`) + assert.ErrorContains(t, err, "is not a supported context") } func Test_ParseStatements_ParseStatementsError(t *testing.T) { diff --git a/pkg/ottl/paths.go b/pkg/ottl/paths.go index dbb66ee7c994..8f6c2c15a72d 100644 --- a/pkg/ottl/paths.go +++ b/pkg/ottl/paths.go @@ -9,6 +9,7 @@ type grammarPathVisitor struct { } func (v *grammarPathVisitor) visitEditor(_ *editor) {} +func (v *grammarPathVisitor) visitConverter(_ *converter) {} func (v *grammarPathVisitor) visitValue(_ *value) {} func (v *grammarPathVisitor) visitMathExprLiteral(_ *mathExprLiteral) {} diff --git a/receiver/libhoneyreceiver/README.md b/receiver/libhoneyreceiver/README.md index a87c8735d5d0..a765c45383f4 100644 --- a/receiver/libhoneyreceiver/README.md +++ b/receiver/libhoneyreceiver/README.md @@ -45,20 +45,21 @@ The following setting is required for refinery traffic since: - "/1/batch" include_metadata: true auth_api: https://api.honeycomb.io - resources: - service_name: service_name - scopes: - library_name: library.name - library_version: library.version - attributes: - trace_id: trace_id - parent_id: parent_id - span_id: span_id - name: name - error: error - spankind: span.kind - durationFields: - - duration_ms + fields: + resources: + service_name: service_name + scopes: + library_name: library.name + library_version: library.version + attributes: + trace_id: trace_id + parent_id: parent_id + span_id: span_id + name: name + error: error + spankind: span.kind + durationFields: + - duration_ms ``` ### Telemetry data types supported diff --git a/receiver/libhoneyreceiver/config.go b/receiver/libhoneyreceiver/config.go index abfd6476dbd1..49602fcfa9d3 100644 --- a/receiver/libhoneyreceiver/config.go +++ b/receiver/libhoneyreceiver/config.go @@ -11,18 +11,19 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" ) // Config represents the receiver config settings within the collector's config.yaml type Config struct { - HTTP *HTTPConfig `mapstructure:"http"` - AuthAPI string `mapstructure:"auth_api"` - Wrapper string `mapstructure:"wrapper"` - Resources ResourcesConfig `mapstructure:"resources"` - Scopes ScopesConfig `mapstructure:"scopes"` - Attributes AttributesConfig `mapstructure:"attributes"` + HTTP *HTTPConfig `mapstructure:"http"` + AuthAPI string `mapstructure:"auth_api"` + Wrapper string `mapstructure:"wrapper"` + FieldMapConfig libhoneyevent.FieldMapConfig `mapstructure:"fields"` } +// HTTPConfig defines the configuration for the HTTP server receiving traces. type HTTPConfig struct { *confighttp.ServerConfig `mapstructure:",squash"` @@ -30,25 +31,7 @@ type HTTPConfig struct { TracesURLPaths []string `mapstructure:"traces_url_paths,omitempty"` } -type ResourcesConfig struct { - ServiceName string `mapstructure:"service_name"` -} - -type ScopesConfig struct { - LibraryName string `mapstructure:"library_name"` - LibraryVersion string `mapstructure:"library_version"` -} - -type AttributesConfig struct { - TraceID string `mapstructure:"trace_id"` - ParentID string `mapstructure:"parent_id"` - SpanID string `mapstructure:"span_id"` - Name string `mapstructure:"name"` - Error string `mapstructure:"error"` - SpanKind string `mapstructure:"spankind"` - DurationFields []string `mapstructure:"durationFields"` -} - +// Validate ensures the HTTP configuration is set. func (cfg *Config) Validate() error { if cfg.HTTP == nil { return errors.New("must specify at least one protocol when using the arbitrary JSON receiver") @@ -56,6 +39,7 @@ func (cfg *Config) Validate() error { return nil } +// Unmarshal unmarshals the configuration from the given configuration and then checks for errors. func (cfg *Config) Unmarshal(conf *confmap.Conf) error { // first load the config normally err := conf.Unmarshal(cfg) diff --git a/receiver/libhoneyreceiver/config_test.go b/receiver/libhoneyreceiver/config_test.go new file mode 100644 index 000000000000..1d3f6b55dc71 --- /dev/null +++ b/receiver/libhoneyreceiver/config_test.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + + libhoneyCfg, ok := cfg.(*Config) + require.True(t, ok, "invalid Config type") + + assert.Equal(t, "localhost:8080", libhoneyCfg.HTTP.Endpoint) + assert.Equal(t, []string{"/events", "/event", "/batch"}, libhoneyCfg.HTTP.TracesURLPaths) + assert.Equal(t, "", libhoneyCfg.AuthAPI) + assert.Equal(t, "service.name", libhoneyCfg.FieldMapConfig.Resources.ServiceName) + assert.Equal(t, "library.name", libhoneyCfg.FieldMapConfig.Scopes.LibraryName) + assert.Equal(t, []string{"duration_ms"}, libhoneyCfg.FieldMapConfig.Attributes.DurationFields) +} diff --git a/receiver/libhoneyreceiver/encoder/encoder.go b/receiver/libhoneyreceiver/encoder/encoder.go new file mode 100644 index 000000000000..b0a998ef310c --- /dev/null +++ b/receiver/libhoneyreceiver/encoder/encoder.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package encoder // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder" + +import ( + "bytes" + + "github.com/gogo/protobuf/jsonpb" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + spb "google.golang.org/genproto/googleapis/rpc/status" +) + +const ( + PbContentType = "application/x-protobuf" + JsonContentType = "application/json" + MsgpackContentType = "application/x-msgpack" +) + +var ( + JsEncoder = &JsonEncoder{} + JsonPbMarshaler = &jsonpb.Marshaler{} + MpEncoder = &msgpackEncoder{} +) + +type Encoder interface { + UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) + UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) + UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) + + MarshalTracesResponse(ptraceotlp.ExportResponse) ([]byte, error) + MarshalMetricsResponse(pmetricotlp.ExportResponse) ([]byte, error) + MarshalLogsResponse(plogotlp.ExportResponse) ([]byte, error) + + MarshalStatus(rsp *spb.Status) ([]byte, error) + + ContentType() string +} + +type JsonEncoder struct{} + +func (JsonEncoder) UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { + req := plogotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) MarshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalStatus(resp *spb.Status) ([]byte, error) { + buf := new(bytes.Buffer) + err := JsonPbMarshaler.Marshal(buf, resp) + return buf.Bytes(), err +} + +func (JsonEncoder) ContentType() string { + return JsonContentType +} + +// messagepack responses seem to work in JSON so leaving this alone for now. +type msgpackEncoder struct{} + +func (msgpackEncoder) UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { + req := plogotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) MarshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalStatus(resp *spb.Status) ([]byte, error) { + buf := new(bytes.Buffer) + err := JsonPbMarshaler.Marshal(buf, resp) + return buf.Bytes(), err +} + +func (msgpackEncoder) ContentType() string { + return MsgpackContentType +} diff --git a/receiver/libhoneyreceiver/factory.go b/receiver/libhoneyreceiver/factory.go index 4d0d0fa25cfa..02ab9dcf1855 100644 --- a/receiver/libhoneyreceiver/factory.go +++ b/receiver/libhoneyreceiver/factory.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/metadata" ) @@ -44,21 +45,23 @@ func createDefaultConfig() component.Config { TracesURLPaths: defaultTracesURLPaths, }, AuthAPI: "", - Resources: ResourcesConfig{ - ServiceName: "service.name", - }, - Scopes: ScopesConfig{ - LibraryName: "library.name", - LibraryVersion: "library.version", - }, - Attributes: AttributesConfig{ - TraceID: "trace.trace_id", - SpanID: "trace.span_id", - ParentID: "trace.parent_id", - Name: "name", - Error: "error", - SpanKind: "span.kind", - DurationFields: durationFieldsArr, + FieldMapConfig: libhoneyevent.FieldMapConfig{ + Resources: libhoneyevent.ResourcesConfig{ + ServiceName: "service.name", + }, + Scopes: libhoneyevent.ScopesConfig{ + LibraryName: "library.name", + LibraryVersion: "library.version", + }, + Attributes: libhoneyevent.AttributesConfig{ + TraceID: "trace.trace_id", + SpanID: "trace.span_id", + ParentID: "trace.parent_id", + Name: "name", + Error: "error", + SpanKind: "span.kind", + DurationFields: durationFieldsArr, + }, }, } } diff --git a/receiver/libhoneyreceiver/factory_test.go b/receiver/libhoneyreceiver/factory_test.go new file mode 100644 index 000000000000..9e369d4fd17e --- /dev/null +++ b/receiver/libhoneyreceiver/factory_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/metadata" +) + +func TestCreateTracesReceiver(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + set := receivertest.NewNopSettings() + tReceiver, err := factory.CreateTraces(context.Background(), set, cfg, consumertest.NewNop()) + + assert.NoError(t, err, "receiver creation failed") + assert.NotNil(t, tReceiver, "receiver creation failed") + + assert.NoError(t, tReceiver.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, tReceiver.Shutdown(context.Background())) +} + +func TestCreateLogsReceiver(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + set := receivertest.NewNopSettings() + lReceiver, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop()) + + assert.NoError(t, err, "receiver creation failed") + assert.NotNil(t, lReceiver, "receiver creation failed") + + assert.NoError(t, lReceiver.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, lReceiver.Shutdown(context.Background())) +} + +func TestType(t *testing.T) { + factory := NewFactory() + assert.Equal(t, metadata.Type, factory.Type()) +} diff --git a/receiver/libhoneyreceiver/go.mod b/receiver/libhoneyreceiver/go.mod index 8a845b28a26c..e3e9a5634fc2 100644 --- a/receiver/libhoneyreceiver/go.mod +++ b/receiver/libhoneyreceiver/go.mod @@ -3,17 +3,28 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhon go 1.22.0 require ( + github.com/gogo/protobuf v1.3.2 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.115.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.116.0 github.com/stretchr/testify v1.10.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 + go.opentelemetry.io/collector/component v0.116.0 go.opentelemetry.io/collector/component/componenttest v0.116.0 go.opentelemetry.io/collector/config/confighttp v0.116.0 go.opentelemetry.io/collector/confmap v1.22.0 go.opentelemetry.io/collector/consumer v1.22.0 go.opentelemetry.io/collector/consumer/consumertest v0.116.0 + go.opentelemetry.io/collector/pdata v1.22.0 + go.opentelemetry.io/collector/receiver v0.116.0 go.opentelemetry.io/collector/receiver/receivertest v0.116.0 + go.opentelemetry.io/collector/semconv v0.116.0 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 + google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 ) require ( + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.116.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.116.0 // indirect ) @@ -25,7 +36,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -37,12 +47,10 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.116.0 github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.11.1 // indirect go.opentelemetry.io/collector/client v1.22.0 // indirect - go.opentelemetry.io/collector/component v0.116.0 go.opentelemetry.io/collector/component/componentstatus v0.116.0 go.opentelemetry.io/collector/config/configauth v0.116.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.22.0 // indirect @@ -53,10 +61,8 @@ require ( go.opentelemetry.io/collector/consumer/consumererror v0.116.0 // indirect go.opentelemetry.io/collector/extension v0.116.0 // indirect go.opentelemetry.io/collector/extension/auth v0.116.0 // indirect - go.opentelemetry.io/collector/pdata v1.22.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.116.0 // indirect go.opentelemetry.io/collector/pipeline v0.116.0 // indirect - go.opentelemetry.io/collector/receiver v0.116.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect @@ -64,11 +70,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.32.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect google.golang.org/grpc v1.69.0 // indirect google.golang.org/protobuf v1.35.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect @@ -77,3 +81,11 @@ require ( replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094 replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/receiver/libhoneyreceiver/go.sum b/receiver/libhoneyreceiver/go.sum index d20a4d1fa883..32bb83e1775b 100644 --- a/receiver/libhoneyreceiver/go.sum +++ b/receiver/libhoneyreceiver/go.sum @@ -60,6 +60,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/collector/client v1.22.0 h1:AAUzHuqYQqxoNqacw1WXgGF/MxtBTwNZuhBvJIorgA0= @@ -114,6 +118,8 @@ go.opentelemetry.io/collector/receiver/receivertest v0.116.0 h1:ZF4QVcots0OUiutb go.opentelemetry.io/collector/receiver/receivertest v0.116.0/go.mod h1:7GGvtHhW3o6457/wGtSWXJtCtlW6VGFUZSlf6wboNTw= go.opentelemetry.io/collector/receiver/xreceiver v0.116.0 h1:Kc+ixqgMjU2sHhzNrFn5TttVNiJlJwTLL3sQrM9uH6s= go.opentelemetry.io/collector/receiver/xreceiver v0.116.0/go.mod h1:H2YGSNFoMbWMIDvB8tzkReHSVqvogihjtet+ppHfYv8= +go.opentelemetry.io/collector/semconv v0.116.0 h1:63xCZomsKJAWmKGWD3lnORiE3WKW6AO4LjnzcHzGx3Y= +go.opentelemetry.io/collector/semconv v0.116.0/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= diff --git a/receiver/libhoneyreceiver/internal/eventtime/eventtime.go b/receiver/libhoneyreceiver/internal/eventtime/eventtime.go new file mode 100644 index 000000000000..b7317b9f06f8 --- /dev/null +++ b/receiver/libhoneyreceiver/internal/eventtime/eventtime.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package eventtime // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" + +import ( + "math" + "strconv" + "time" +) + +func GetNowTime() time.Time { + return time.Now() +} + +func GetEventTime(etHeader string) time.Time { + var eventTime time.Time + if etHeader != "" { + // Great, they sent us a time header. let's try and parse it. + // RFC3339Nano is the default that we send from all our SDKs + eventTime, _ = time.Parse(time.RFC3339Nano, etHeader) + if eventTime.IsZero() { + // the default didn't catch it, let's try a few other things + // is it all numeric? then try unix epoch times + epochInt, err := strconv.ParseInt(etHeader, 0, 64) + if err == nil { + // it might be seconds or it might be milliseconds! Who can know! + // 10-digit numbers are seconds, 13-digit milliseconds, 16 microseconds + if len(etHeader) == 10 { + eventTime = time.Unix(epochInt, 0) + } else if len(etHeader) > 10 { + // turn it into seconds and fractional seconds + fractionalTime := etHeader[:10] + "." + etHeader[10:] + // then chop it into the int part and the fractional part + if epochFloat, err := strconv.ParseFloat(fractionalTime, 64); err == nil { + sec, dec := math.Modf(epochFloat) + eventTime = time.Unix(int64(sec), int64(dec*(1e9))) + } + } + } else { + epochFloat, err := strconv.ParseFloat(etHeader, 64) + if err == nil { + sec, dec := math.Modf(epochFloat) + eventTime = time.Unix(int64(sec), int64(dec*(1e9))) + } + } + } + } + return eventTime.UTC() +} + +func GetEventTimeSec(etHeader string) int64 { + eventTime := GetEventTime(etHeader) + return eventTime.Unix() +} + +func GetEventTimeNano(etHeader string) int64 { + eventTime := GetEventTime(etHeader) + return eventTime.UnixNano() +} + +func GetEventTimeDefaultString() string { + return time.Now().Format(time.RFC3339Nano) +} diff --git a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go new file mode 100644 index 000000000000..5519e304f138 --- /dev/null +++ b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go @@ -0,0 +1,200 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" + +import ( + "encoding/json" + "errors" + "fmt" + "slices" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" +) + +// FieldMapConfig is used to map the fields from the LibhoneyEvent to PData formats +type FieldMapConfig struct { + Resources ResourcesConfig `mapstructure:"resources"` + Scopes ScopesConfig `mapstructure:"scopes"` + Attributes AttributesConfig `mapstructure:"attributes"` +} + +// ResourcesConfig is used to map the fields from the LibhoneyEvent to PData formats +type ResourcesConfig struct { + ServiceName string `mapstructure:"service_name"` +} + +// ScopesConfig is used to map the fields from the LibhoneyEvent to PData formats +type ScopesConfig struct { + LibraryName string `mapstructure:"library_name"` + LibraryVersion string `mapstructure:"library_version"` +} + +// AttributesConfig is used to map the fields from the LibhoneyEvent to PData formats +type AttributesConfig struct { + TraceID string `mapstructure:"trace_id"` + ParentID string `mapstructure:"parent_id"` + SpanID string `mapstructure:"span_id"` + Name string `mapstructure:"name"` + Error string `mapstructure:"error"` + SpanKind string `mapstructure:"spankind"` + DurationFields []string `mapstructure:"durationFields"` +} + +// LibhoneyEvent is the event structure from libhoney +type LibhoneyEvent struct { + Samplerate int `json:"samplerate" msgpack:"samplerate"` + MsgPackTimestamp *time.Time `msgpack:"time"` + Time string `json:"time"` // should not be trusted. use MsgPackTimestamp + Data map[string]any `json:"data" msgpack:"data"` +} + +// UnmarshalJSON overrides the unmarshall to make sure the MsgPackTimestamp is set +func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error { + type _libhoneyEvent LibhoneyEvent + tstr := eventtime.GetEventTimeDefaultString() + tzero := time.Time{} + tmp := _libhoneyEvent{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1} + + err := json.Unmarshal(j, &tmp) + if err != nil { + return err + } + if tmp.MsgPackTimestamp.IsZero() && tmp.Time == "none" { + // neither timestamp was set. give it right now. + tmp.Time = tstr + tnow := time.Now() + tmp.MsgPackTimestamp = &tnow + } + if tmp.MsgPackTimestamp.IsZero() { + propertime := eventtime.GetEventTime(tmp.Time) + tmp.MsgPackTimestamp = &propertime + } + + *l = LibhoneyEvent(tmp) + return nil +} + +// DebugString returns a string representation of the LibhoneyEvent +func (l *LibhoneyEvent) DebugString() string { + return fmt.Sprintf("%#v", l) +} + +// SignalType returns the type of signal this event represents. Only log is implemented for now. +func (l *LibhoneyEvent) SignalType() (string, error) { + return "log", nil +} + +// GetService returns the service name from the event or the dataset name if no service name is found. +func (l *LibhoneyEvent) GetService(fields FieldMapConfig, seen *ServiceHistory, dataset string) (string, error) { + if serviceName, ok := l.Data[fields.Resources.ServiceName]; ok { + seen.NameCount[serviceName.(string)]++ + return serviceName.(string), nil + } + return dataset, errors.New("no service.name found in event") +} + +// GetScope returns the scope key for the event. If the scope has not been seen before, it creates a new one. +func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serviceName string) (string, error) { + if scopeLibraryName, ok := l.Data[fields.Scopes.LibraryName]; ok { + scopeKey := serviceName + scopeLibraryName.(string) + if _, ok := seen.Scope[scopeKey]; ok { + // if we've seen it, we don't expect it to be different right away so we'll just return it. + return scopeKey, nil + } + // otherwise, we need to make a new found scope + scopeLibraryVersion := "unset" + if scopeLibVer, ok := l.Data[fields.Scopes.LibraryVersion]; ok { + scopeLibraryVersion = scopeLibVer.(string) + } + newScope := SimpleScope{ + ServiceName: serviceName, // we only set the service name once. If the same library comes from multiple services in the same batch, we're in trouble. + LibraryName: scopeLibraryName.(string), + LibraryVersion: scopeLibraryVersion, + ScopeSpans: ptrace.NewSpanSlice(), + ScopeLogs: plog.NewLogRecordSlice(), + } + seen.Scope[scopeKey] = newScope + return scopeKey, nil + } + return "libhoney.receiver", errors.New("library name not found") +} + +// SimpleScope is a simple struct to hold the scope data +type SimpleScope struct { + ServiceName string + LibraryName string + LibraryVersion string + ScopeSpans ptrace.SpanSlice + ScopeLogs plog.LogRecordSlice +} + +// ScopeHistory is a map of scope keys to the SimpleScope object +type ScopeHistory struct { + Scope map[string]SimpleScope // key here is service.name+library.name +} + +// ServiceHistory is a map of service names to the number of times they've been seen +type ServiceHistory struct { + NameCount map[string]int +} + +// ToPLogRecord converts a LibhoneyEvent to a Pdata LogRecord +func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *[]string, logger zap.Logger) error { + timeNs := l.MsgPackTimestamp.UnixNano() + logger.Debug("processing log with", zap.Int64("timestamp", timeNs)) + newLog.SetTimestamp(pcommon.Timestamp(timeNs)) + + if logSevCode, ok := l.Data["severity_code"]; ok { + logSevInt := int32(logSevCode.(int64)) + newLog.SetSeverityNumber(plog.SeverityNumber(logSevInt)) + } + + if logSevText, ok := l.Data["severity_text"]; ok { + newLog.SetSeverityText(logSevText.(string)) + } + + if logFlags, ok := l.Data["flags"]; ok { + logFlagsUint := uint32(logFlags.(uint64)) + newLog.SetFlags(plog.LogRecordFlags(logFlagsUint)) + } + + // undoing this is gonna be complicated: https://github.com/honeycombio/husky/blob/91c0498333cd9f5eed1fdb8544ca486db7dea565/otlp/logs.go#L61 + if logBody, ok := l.Data["body"]; ok { + newLog.Body().SetStr(logBody.(string)) + } + + newLog.Attributes().PutInt("SampleRate", int64(l.Samplerate)) + + logFieldsAlready := []string{"severity_text", "severity_code", "flags", "body"} + for k, v := range l.Data { + if slices.Contains(*alreadyUsedFields, k) { + continue + } + if slices.Contains(logFieldsAlready, k) { + continue + } + switch v := v.(type) { + case string: + newLog.Attributes().PutStr(k, v) + case int: + newLog.Attributes().PutInt(k, int64(v)) + case int64, int16, int32: + intv := v.(int64) + newLog.Attributes().PutInt(k, intv) + case float64: + newLog.Attributes().PutDouble(k, v) + case bool: + newLog.Attributes().PutBool(k, v) + default: + logger.Warn("Span data type issue", zap.Int64("timestamp", timeNs), zap.String("key", k)) + } + } + return nil +} diff --git a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go new file mode 100644 index 000000000000..40348d2a640a --- /dev/null +++ b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent_test.go @@ -0,0 +1,294 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyevent + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" +) + +func TestLibhoneyEvent_UnmarshalJSON(t *testing.T) { + tests := []struct { + name string + json string + want LibhoneyEvent + wantErr bool + }{ + { + name: "basic event", + json: `{ + "time": "2024-01-01T00:00:00Z", + "data": {"key": "value"}, + "samplerate": 1 + }`, + want: LibhoneyEvent{ + Time: "2024-01-01T00:00:00Z", + Data: map[string]any{"key": "value"}, + Samplerate: 1, + }, + }, + { + name: "invalid json", + json: `{invalid`, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var got LibhoneyEvent + err := json.Unmarshal([]byte(tt.json), &got) + + if tt.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.want.Time, got.Time) + assert.Equal(t, tt.want.Data, got.Data) + assert.Equal(t, tt.want.Samplerate, got.Samplerate) + assert.NotNil(t, got.MsgPackTimestamp) + }) + } + + test := struct { + name string + json string + want LibhoneyEvent + wantErr bool + }{ + name: "missing time uses current", + json: `{ + "data": {"key": "value"}, + "samplerate": 2 + }`, + want: LibhoneyEvent{ + Time: "", + Data: map[string]any{"key": "value"}, + Samplerate: 2, + }, + } + t.Run(test.name, func(t *testing.T) { + var got LibhoneyEvent + err := json.Unmarshal([]byte(test.json), &got) + + require.NoError(t, err) + assert.Equal(t, test.want.Data, got.Data) + gotTime, timeErr := time.Parse(time.RFC3339Nano, got.Time) + assert.NoError(t, timeErr) + assert.WithinDuration(t, time.Now(), gotTime, time.Second) + assert.Equal(t, test.want.Samplerate, got.Samplerate) + assert.NotNil(t, got.MsgPackTimestamp) + }) +} + +func TestLibHoneyEvent_ToPLogRecord(t *testing.T) { + logger := zap.NewNop() + now := time.Now() + tests := []struct { + name string + event LibhoneyEvent + alreadyUsedFields []string + want func(plog.LogRecord) + wantErr bool + }{ + { + name: "basic conversion", + event: LibhoneyEvent{ + Samplerate: 1, + MsgPackTimestamp: &now, + Data: map[string]any{ + "severity_text": "ERROR", + "severity_code": int64(2), + "body": "test message", + "string_attr": "value", + "int_attr": 42, + "float_attr": 3.14, + "bool_attr": true, + }, + }, + want: func(lr plog.LogRecord) { + lr.SetSeverityText("ERROR") + lr.SetSeverityNumber(plog.SeverityNumber(2)) + lr.Body().SetStr("test message") + lr.Attributes().PutStr("string_attr", "value") + lr.Attributes().PutInt("int_attr", 42) + lr.Attributes().PutDouble("float_attr", 3.14) + lr.Attributes().PutBool("bool_attr", true) + lr.Attributes().PutInt("SampleRate", 1) + }, + }, + { + name: "skip already used fields", + event: LibhoneyEvent{ + MsgPackTimestamp: &now, + Data: map[string]any{ + "skip_me": "value", + "keep_me": "value", + }, + }, + alreadyUsedFields: []string{"skip_me"}, + want: func(lr plog.LogRecord) { + lr.Attributes().PutStr("keep_me", "value") + lr.Attributes().PutInt("SampleRate", 0) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + newLog := plog.NewLogRecord() + err := tt.event.ToPLogRecord(&newLog, &tt.alreadyUsedFields, *logger) + + if tt.wantErr { + assert.Error(t, err) + return + } + require.NoError(t, err) + if tt.want != nil { + want := plog.NewLogRecord() + tt.want(want) + + // Check severity + assert.Equal(t, want.SeverityText(), newLog.SeverityText()) + assert.Equal(t, want.SeverityNumber(), newLog.SeverityNumber()) + + // Check body + assert.Equal(t, want.Body().AsString(), newLog.Body().AsString()) + + // Check each attribute has correct type and value + want.Attributes().Range(func(k string, v pcommon.Value) bool { + got, ok := newLog.Attributes().Get(k) + assert.True(t, ok, "missing attribute %s", k) + assert.Equal(t, v.Type(), got.Type(), "wrong type for attribute %s", k) + assert.Equal(t, v, got, "wrong value for attribute %s", k) + + return true + }) + + // Verify no extra attributes + assert.Equal(t, want.Attributes().Len(), newLog.Attributes().Len()) + } + }) + } +} + +func TestLibHoneyEvent_GetService(t *testing.T) { + tests := []struct { + name string + event LibhoneyEvent + fields FieldMapConfig + dataset string + want string + wantErr bool + }{ + { + name: "service name found", + event: LibhoneyEvent{ + Data: map[string]any{ + "service.name": "test-service", + }, + }, + fields: FieldMapConfig{ + Resources: ResourcesConfig{ + ServiceName: "service.name", + }, + }, + want: "test-service", + }, + { + name: "service name not found", + event: LibhoneyEvent{ + Data: map[string]any{}, + }, + fields: FieldMapConfig{ + Resources: ResourcesConfig{ + ServiceName: "service.name", + }, + }, + dataset: "default-dataset", + want: "default-dataset", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + seen := &ServiceHistory{NameCount: make(map[string]int)} + got, err := tt.event.GetService(tt.fields, seen, tt.dataset) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.want, got) + }) + } +} + +func TestLibhoneyEvent_GetScope(t *testing.T) { + tests := []struct { + name string + event LibhoneyEvent + fields FieldMapConfig + serviceName string + want string + wantErr bool + }{ + { + name: "scope found", + event: LibhoneyEvent{ + Data: map[string]any{ + "library.name": "test-lib", + "library.version": "1.0.0", + }, + }, + fields: FieldMapConfig{ + Scopes: ScopesConfig{ + LibraryName: "library.name", + LibraryVersion: "library.version", + }, + }, + serviceName: "test-service", + want: "test-servicetest-lib", + }, + { + name: "scope not found", + event: LibhoneyEvent{ + Data: map[string]any{}, + }, + fields: FieldMapConfig{ + Scopes: ScopesConfig{ + LibraryName: "library.name", + }, + }, + serviceName: "test-service", + want: "libhoney.receiver", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + seen := &ScopeHistory{Scope: make(map[string]SimpleScope)} + got, err := tt.event.GetScope(tt.fields, seen, tt.serviceName) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/receiver/libhoneyreceiver/internal/parser/parser.go b/receiver/libhoneyreceiver/internal/parser/parser.go new file mode 100644 index 000000000000..d2818dadd80a --- /dev/null +++ b/receiver/libhoneyreceiver/internal/parser/parser.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package parser // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" + +import ( + "fmt" + "net/url" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/collector/semconv/v1.16.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" +) + +// GetDatasetFromRequest extracts the dataset name from the request path +func GetDatasetFromRequest(path string) (string, error) { + if path == "" { + return "", fmt.Errorf("missing dataset name") + } + dataset, err := url.PathUnescape(path) + if err != nil { + return "", err + } + return dataset, nil +} + +// ToPdata converts a list of LibhoneyEvents to a Pdata Logs object +func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) plog.Logs { + foundServices := libhoneyevent.ServiceHistory{} + foundServices.NameCount = make(map[string]int) + foundScopes := libhoneyevent.ScopeHistory{} + foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) + + foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) // a list of already seen scopes + foundScopes.Scope["libhoney.receiver"] = libhoneyevent.SimpleScope{ + ServiceName: dataset, + LibraryName: "libhoney.receiver", + LibraryVersion: "1.0.0", + ScopeSpans: ptrace.NewSpanSlice(), + ScopeLogs: plog.NewLogRecordSlice(), + } // seed a default + + alreadyUsedFields := []string{cfg.Resources.ServiceName, cfg.Scopes.LibraryName, cfg.Scopes.LibraryVersion} + alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.Name, + cfg.Attributes.TraceID, cfg.Attributes.ParentID, cfg.Attributes.SpanID, + cfg.Attributes.Error, cfg.Attributes.SpanKind, + ) + alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.DurationFields...) + + for _, lhe := range lhes { + action, err := lhe.SignalType() + if err != nil { + logger.Warn("signal type unclear") + } + switch action { + case "span": + // not implemented + case "log": + logService, _ := lhe.GetService(cfg, &foundServices, dataset) + logScopeKey, _ := lhe.GetScope(cfg, &foundScopes, logService) // adds a new found scope if needed + newLog := foundScopes.Scope[logScopeKey].ScopeLogs.AppendEmpty() + err := lhe.ToPLogRecord(&newLog, &alreadyUsedFields, logger) + if err != nil { + logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", lhe.DebugString())) + } + } + } + + resultLogs := plog.NewLogs() + + for scopeName, ss := range foundScopes.Scope { + if ss.ScopeLogs.Len() > 0 { + lr := resultLogs.ResourceLogs().AppendEmpty() + lr.SetSchemaUrl(semconv.SchemaURL) + lr.Resource().Attributes().PutStr(semconv.AttributeServiceName, ss.ServiceName) + + ls := lr.ScopeLogs().AppendEmpty() + ls.Scope().SetName(ss.LibraryName) + ls.Scope().SetVersion(ss.LibraryVersion) + foundScopes.Scope[scopeName].ScopeLogs.MoveAndAppendTo(ls.LogRecords()) + } + } + + return resultLogs +} diff --git a/receiver/libhoneyreceiver/libhoney.go b/receiver/libhoneyreceiver/libhoney.go deleted file mode 100644 index 4ad1faab8fbb..000000000000 --- a/receiver/libhoneyreceiver/libhoney.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" - -import ( - "context" - "errors" - "net" - "net/http" - "sync" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componentstatus" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/zap" -) - -type libhoneyReceiver struct { - cfg *Config - serverHTTP *http.Server - - nextTraces consumer.Traces - nextLogs consumer.Logs - shutdownWG sync.WaitGroup - - obsrepHTTP *receiverhelper.ObsReport - - settings *receiver.Settings -} - -type TeamInfo struct { - Slug string `json:"slug"` -} - -type EnvironmentInfo struct { - Slug string `json:"slug"` - Name string `json:"name"` -} - -type AuthInfo struct { - APIKeyAccess map[string]bool `json:"api_key_access"` - Team TeamInfo `json:"team"` - Environment EnvironmentInfo `json:"environment"` -} - -func newLibhoneyReceiver(cfg *Config, set *receiver.Settings) (*libhoneyReceiver, error) { - r := &libhoneyReceiver{ - cfg: cfg, - nextTraces: nil, - settings: set, - } - - var err error - r.obsrepHTTP, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: set.ID, - Transport: "http", - ReceiverCreateSettings: *set, - }) - if err != nil { - return nil, err - } - - return r, nil -} - -func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.Host) error { - // If HTTP is not enabled, nothing to start. - if r.cfg.HTTP == nil { - return nil - } - - if r.nextTraces != nil { - // initialize routes - r.settings.Logger.Debug("r.nextTraces found and ready to go") - } else { - r.settings.Logger.Debug("r.nextTraces is nil for some reason") - } - - // start server - var err error - r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.HTTP.ServerConfig.Endpoint)) - var hln net.Listener - if hln, err = r.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { - return err - } - - r.shutdownWG.Add(1) - go func() { - defer r.shutdownWG.Done() - - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && !errors.Is(errHTTP, http.ErrServerClosed) { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errHTTP)) - } - }() - return nil -} - -func (r *libhoneyReceiver) Start(ctx context.Context, host component.Host) error { - if err := r.startHTTPServer(ctx, host); err != nil { - return errors.Join(err, r.Shutdown(ctx)) - } - - return nil -} - -// Shutdown is a method to turn off receiving. -func (r *libhoneyReceiver) Shutdown(ctx context.Context) error { - var err error - - if r.serverHTTP != nil { - err = r.serverHTTP.Shutdown(ctx) - } - - r.shutdownWG.Wait() - return err -} - -func (r *libhoneyReceiver) registerTraceConsumer(tc consumer.Traces) { - r.nextTraces = tc -} - -func (r *libhoneyReceiver) registerLogConsumer(tc consumer.Logs) { - r.nextLogs = tc -} diff --git a/receiver/libhoneyreceiver/receiver.go b/receiver/libhoneyreceiver/receiver.go new file mode 100644 index 000000000000..84ad68e4638a --- /dev/null +++ b/receiver/libhoneyreceiver/receiver.go @@ -0,0 +1,291 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "mime" + "net" + "net/http" + "strings" + "sync" + + "github.com/vmihailenco/msgpack/v5" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" +) + +type libhoneyReceiver struct { + cfg *Config + server *http.Server + nextTraces consumer.Traces + nextLogs consumer.Logs + shutdownWG sync.WaitGroup + obsreport *receiverhelper.ObsReport + settings *receiver.Settings +} + +// TeamInfo is part of the AuthInfo struct that stores the team slug +type TeamInfo struct { + Slug string `json:"slug"` +} + +// EnvironmentInfo is part of the AuthInfo struct that stores the environment slug and name +type EnvironmentInfo struct { + Slug string `json:"slug"` + Name string `json:"name"` +} + +// AuthInfo is used by Libhoney to validate team and environment information against Honeycomb's Auth API +type AuthInfo struct { + APIKeyAccess map[string]bool `json:"api_key_access"` + Team TeamInfo `json:"team"` + Environment EnvironmentInfo `json:"environment"` +} + +func newLibhoneyReceiver(cfg *Config, set *receiver.Settings) (*libhoneyReceiver, error) { + r := &libhoneyReceiver{ + cfg: cfg, + nextTraces: nil, + settings: set, + } + + var err error + r.obsreport, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: "http", + ReceiverCreateSettings: *set, + }) + if err != nil { + return nil, err + } + + return r, nil +} + +func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.Host) error { + // If HTTP is not enabled, nothing to start. + if r.cfg.HTTP == nil { + return nil + } + + httpMux := http.NewServeMux() + + r.settings.Logger.Info("r.nextTraces is not null so httpTracesReciever was added", zap.Int("paths", len(r.cfg.HTTP.TracesURLPaths))) + for _, path := range r.cfg.HTTP.TracesURLPaths { + httpMux.HandleFunc(path, func(resp http.ResponseWriter, req *http.Request) { + r.handleEvent(resp, req) + }) + r.settings.Logger.Debug("Added path to HTTP server", zap.String("path", path)) + } + + if r.cfg.AuthAPI != "" { + httpMux.HandleFunc("/1/auth", func(resp http.ResponseWriter, req *http.Request) { + r.handleAuth(resp, req) + }) + } + + var err error + if r.server, err = r.cfg.HTTP.ToServer(ctx, host, r.settings.TelemetrySettings, httpMux); err != nil { + return err + } + + r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.HTTP.ServerConfig.Endpoint)) + var hln net.Listener + if hln, err = r.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { + return err + } + + r.shutdownWG.Add(1) + go func() { + defer r.shutdownWG.Done() + + if err := r.server.Serve(hln); err != nil && !errors.Is(err, http.ErrServerClosed) { + componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) + } + }() + return nil +} + +func (r *libhoneyReceiver) Start(ctx context.Context, host component.Host) error { + if err := r.startHTTPServer(ctx, host); err != nil { + return errors.Join(err, r.Shutdown(ctx)) + } + + return nil +} + +// Shutdown is a method to turn off receiving. +func (r *libhoneyReceiver) Shutdown(ctx context.Context) error { + var err error + + if r.server != nil { + err = r.server.Shutdown(ctx) + } + + r.shutdownWG.Wait() + return err +} + +func (r *libhoneyReceiver) registerTraceConsumer(tc consumer.Traces) { + r.nextTraces = tc +} + +func (r *libhoneyReceiver) registerLogConsumer(tc consumer.Logs) { + r.nextLogs = tc +} + +func (r *libhoneyReceiver) handleAuth(resp http.ResponseWriter, req *http.Request) { + authURL := fmt.Sprintf("%s/1/auth", r.cfg.AuthAPI) + authReq, err := http.NewRequest(http.MethodGet, authURL, nil) + if err != nil { + errJSON, _ := json.Marshal(`{"error": "failed to create AuthInfo request"}`) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) + return + } + authReq.Header.Set("x-honeycomb-team", req.Header.Get("x-honeycomb-team")) + var authClient http.Client + authResp, err := authClient.Do(authReq) + if err != nil { + errJSON, _ := json.Marshal(fmt.Sprintf(`"error": "failed to send request to auth api endpoint", "message", "%s"}`, err.Error())) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) + return + } + defer authResp.Body.Close() + + switch { + case authResp.StatusCode == http.StatusUnauthorized: + errJSON, _ := json.Marshal(`"error": "received 401 response for AuthInfo request from Honeycomb API - check your API key"}`) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) + return + case authResp.StatusCode > 299: + errJSON, _ := json.Marshal(fmt.Sprintf(`"error": "bad response code from API", "status_code", %d}`, authResp.StatusCode)) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) + return + } + authRawBody, _ := io.ReadAll(authResp.Body) + _, err = resp.Write(authRawBody) + if err != nil { + r.settings.Logger.Info("couldn't write http response") + } +} + +func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Request) { + enc, ok := readContentType(resp, req) + if !ok { + return + } + + dataset, err := parser.GetDatasetFromRequest(req.RequestURI) + if err != nil { + r.settings.Logger.Info("No dataset found in URL", zap.String("req.RequstURI", req.RequestURI)) + } + + for _, p := range r.cfg.HTTP.TracesURLPaths { + dataset = strings.Replace(dataset, p, "", 1) + r.settings.Logger.Debug("dataset parsed", zap.String("dataset.parsed", dataset)) + } + + body, err := io.ReadAll(req.Body) + if err != nil { + errorutil.HTTPError(resp, err) + } + if err = req.Body.Close(); err != nil { + errorutil.HTTPError(resp, err) + } + libhoneyevents := make([]libhoneyevent.LibhoneyEvent, 0) + switch req.Header.Get("Content-Type") { + case "application/x-msgpack", "application/msgpack": + decoder := msgpack.NewDecoder(bytes.NewReader(body)) + decoder.UseLooseInterfaceDecoding(true) + err = decoder.Decode(&libhoneyevents) + if err != nil { + r.settings.Logger.Info("messagepack decoding failed") + } + if len(libhoneyevents) > 0 { + r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) + r.settings.Logger.Debug("event zero", zap.String("event.data", libhoneyevents[0].DebugString())) + } + case encoder.JsonContentType: + err = json.Unmarshal(body, &libhoneyevents) + if err != nil { + errorutil.HTTPError(resp, err) + } + if len(libhoneyevents) > 0 { + r.settings.Logger.Debug("Decoding with json worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) + } + } + + otlpLogs := parser.ToPdata(dataset, libhoneyevents, r.cfg.FieldMapConfig, *r.settings.Logger) + + numLogs := otlpLogs.LogRecordCount() + if numLogs > 0 { + ctx := r.obsreport.StartLogsOp(context.Background()) + err = r.nextLogs.ConsumeLogs(ctx, otlpLogs) + r.obsreport.EndLogsOp(ctx, "protobuf", numLogs, err) + } + + if err != nil { + errorutil.HTTPError(resp, err) + return + } + + noErrors := []byte(`{"errors":[]}`) + writeResponse(resp, enc.ContentType(), http.StatusAccepted, noErrors) +} + +func readContentType(resp http.ResponseWriter, req *http.Request) (encoder.Encoder, bool) { + if req.Method != http.MethodPost { + handleUnmatchedMethod(resp) + return nil, false + } + + switch getMimeTypeFromContentType(req.Header.Get("Content-Type")) { + case encoder.JsonContentType: + return encoder.JsEncoder, true + case "application/x-msgpack", "application/msgpack": + return encoder.MpEncoder, true + default: + handleUnmatchedContentType(resp) + return nil, false + } +} + +func writeResponse(w http.ResponseWriter, contentType string, statusCode int, msg []byte) { + w.Header().Set("Content-Type", contentType) + w.WriteHeader(statusCode) + _, _ = w.Write(msg) +} + +func getMimeTypeFromContentType(contentType string) string { + mediatype, _, err := mime.ParseMediaType(contentType) + if err != nil { + return "" + } + return mediatype +} + +func handleUnmatchedMethod(resp http.ResponseWriter) { + status := http.StatusMethodNotAllowed + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v method not allowed, supported: [POST]", status))) +} + +func handleUnmatchedContentType(resp http.ResponseWriter) { + status := http.StatusUnsupportedMediaType + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, encoder.JsonContentType, encoder.PbContentType))) +} diff --git a/receiver/libhoneyreceiver/receiver_test.go b/receiver/libhoneyreceiver/receiver_test.go new file mode 100644 index 000000000000..31674c515b11 --- /dev/null +++ b/receiver/libhoneyreceiver/receiver_test.go @@ -0,0 +1,276 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack/v5" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" +) + +func TestNewLibhoneyReceiver(t *testing.T) { + defaultCfg := createDefaultConfig() + httpCfg := defaultCfg.(*Config).HTTP + tests := []struct { + name string + config *Config + wantError bool + }{ + { + name: "valid_config", + config: &Config{ + HTTP: httpCfg, + }, + }, + { + name: "nil_config", + config: nil, + wantError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + set := receivertest.NewNopSettings() + r, err := newLibhoneyReceiver(tt.config, &set) + if tt.wantError { + assert.Error(t, err) + assert.Nil(t, r) + return + } + assert.NoError(t, err) + assert.NotNil(t, r) + }) + } +} + +func TestLibhoneyReceiver_Start(t *testing.T) { + cfg := createDefaultConfig() + + set := receivertest.NewNopSettings() + r, err := newLibhoneyReceiver(cfg.(*Config), &set) + require.NoError(t, err) + + r.registerTraceConsumer(consumertest.NewNop()) + r.registerLogConsumer(consumertest.NewNop()) + + err = r.Start(context.Background(), componenttest.NewNopHost()) + assert.NoError(t, err) + + err = r.Shutdown(context.Background()) + assert.NoError(t, err) +} + +func TestLibhoneyReceiver_HandleEvent(t *testing.T) { + now := time.Now() + tests := []struct { + name string + events []libhoneyevent.LibhoneyEvent + contentType string + expectedStatus int + wantError bool + }{ + { + name: "valid_json_event", + events: []libhoneyevent.LibhoneyEvent{ + { + Time: now.Format(time.RFC3339), + MsgPackTimestamp: &now, + Data: map[string]any{ + "message": "test event", + }, + Samplerate: 1, + }, + }, + contentType: "application/json", + expectedStatus: http.StatusAccepted, + }, + { + name: "valid_msgpack_event", + events: []libhoneyevent.LibhoneyEvent{ + { + Time: now.Format(time.RFC3339), + MsgPackTimestamp: &now, + Data: map[string]any{ + "message": "test event", + }, + Samplerate: 1, + }, + }, + contentType: "application/msgpack", + expectedStatus: http.StatusAccepted, + }, + { + name: "invalid_content_type", + events: []libhoneyevent.LibhoneyEvent{}, + contentType: "text/plain", + expectedStatus: http.StatusUnsupportedMediaType, + wantError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := createDefaultConfig() + set := receivertest.NewNopSettings() + r, err := newLibhoneyReceiver(cfg.(*Config), &set) + require.NoError(t, err) + + sink := &consumertest.LogsSink{} + r.registerLogConsumer(sink) + + var body []byte + switch tt.contentType { + case "application/json": + body, err = json.Marshal(tt.events) + case "application/msgpack": + body, err = msgpack.Marshal(tt.events) + default: + body = []byte("invalid content") + } + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/1/events/test_dataset", bytes.NewReader(body)) + req.Header.Set("Content-Type", tt.contentType) + w := httptest.NewRecorder() + + r.handleEvent(w, req) + + resp := w.Result() + assert.Equal(t, tt.expectedStatus, resp.StatusCode) + + if !tt.wantError { + assert.Eventually(t, func() bool { + return sink.LogRecordCount() > 0 + }, time.Second, 10*time.Millisecond) + } + }) + } +} + +func TestLibhoneyReceiver_AuthEndpoint(t *testing.T) { + tests := []struct { + name string + authAPI string + apiKey string + mockResponse *http.Response + expectedStatus int + }{ + { + name: "valid_auth", + authAPI: "http://mock-auth-api", + apiKey: "test-key", + mockResponse: &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString(`{ + "team": {"slug": "test-team"}, + "environment": {"slug": "test-env", "name": "Test Env"} + }`)), + }, + expectedStatus: http.StatusOK, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.AuthAPI = tt.authAPI + set := receivertest.NewNopSettings() + r, err := newLibhoneyReceiver(cfg, &set) + require.NoError(t, err) + + // Create test server to mock auth API + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, tt.apiKey, r.Header.Get("x-honeycomb-team")) + w.WriteHeader(tt.mockResponse.StatusCode) + _, err := io.Copy(w, tt.mockResponse.Body) + assert.NoError(t, err, "failed to copy response body") + })) + defer ts.Close() + + req := httptest.NewRequest(http.MethodGet, "/1/auth", nil) + req.Header.Set("x-honeycomb-team", tt.apiKey) + w := httptest.NewRecorder() + + r.server = &http.Server{ + Handler: http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { + r.handleAuth(resp, req) + }), + ReadHeaderTimeout: 3 * time.Second, + } + + resp := w.Result() + assert.Equal(t, tt.expectedStatus, resp.StatusCode) + }) + } +} + +func TestReadContentType(t *testing.T) { + tests := []struct { + name string + method string + contentType string + expectedStatus int + wantEncoder bool + }{ + { + name: "valid_json", + method: http.MethodPost, + contentType: "application/json", + expectedStatus: http.StatusOK, + wantEncoder: true, + }, + { + name: "valid_msgpack", + method: http.MethodPost, + contentType: "application/msgpack", + expectedStatus: http.StatusOK, + wantEncoder: true, + }, + { + name: "invalid_method", + method: http.MethodGet, + contentType: "application/json", + expectedStatus: http.StatusMethodNotAllowed, + wantEncoder: false, + }, + { + name: "invalid_content_type", + method: http.MethodPost, + contentType: "text/plain", + expectedStatus: http.StatusUnsupportedMediaType, + wantEncoder: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest(tt.method, "/test", nil) + req.Header.Set("Content-Type", tt.contentType) + w := httptest.NewRecorder() + + enc, ok := readContentType(w, req) + assert.Equal(t, tt.wantEncoder, ok) + if tt.wantEncoder { + assert.NotNil(t, enc) + } else { + assert.Equal(t, tt.expectedStatus, w.Code) + } + }) + } +}