From f2975573d492a919ac66a2a883306141cf033fcf Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Fri, 8 Nov 2024 15:44:22 +0100 Subject: [PATCH] [chore][pkg/stanza] refactor: introduce emit.Token struct (#36260) This makes the code clearer by encapsulating the token's body and attributes in a single structure. It should make future change clearer when the emit callback will be changed to accept a collection of tokens as opposed to a single token. The Sink type could use some refactoring as well, but I'm not doing it here to keep the changes to the minimum for clarity and ease of code review. --- .chloggen/refactor-callback-token.yaml | 27 +++++++++++++++++++ pkg/stanza/fileconsumer/benchmark_test.go | 5 ++-- pkg/stanza/fileconsumer/emit/emit.go | 14 +++++++++- .../fileconsumer/internal/emittest/nop.go | 4 ++- .../internal/emittest/nop_test.go | 4 ++- .../fileconsumer/internal/emittest/sink.go | 8 +++--- .../internal/emittest/sink_test.go | 4 ++- .../fileconsumer/internal/reader/reader.go | 2 +- pkg/stanza/operator/input/file/input.go | 9 ++++--- receiver/otlpjsonfilereceiver/file.go | 17 ++++++------ 10 files changed, 71 insertions(+), 23 deletions(-) create mode 100644 .chloggen/refactor-callback-token.yaml diff --git a/.chloggen/refactor-callback-token.yaml b/.chloggen/refactor-callback-token.yaml new file mode 100644 index 000000000000..a7d36715a111 --- /dev/null +++ b/.chloggen/refactor-callback-token.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Changed signature of `emit.Callback` function in `pkg/stanza/fileconsumer/emit` package by introducing `emit.Token` struct that encapsulates the token's body and attributes. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36260] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index c470fa50e17c..20ecfc980f15 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" @@ -187,8 +188,8 @@ func BenchmarkFileInput(b *testing.B) { cfg.PollInterval = time.Microsecond doneChan := make(chan bool, len(files)) - callback := func(_ context.Context, token []byte, _ map[string]any) error { - if len(token) == 0 { + callback := func(_ context.Context, token emit.Token) error { + if len(token.Body) == 0 { doneChan <- true } return nil diff --git a/pkg/stanza/fileconsumer/emit/emit.go b/pkg/stanza/fileconsumer/emit/emit.go index 65859444229e..6c27a3c72a4a 100644 --- a/pkg/stanza/fileconsumer/emit/emit.go +++ b/pkg/stanza/fileconsumer/emit/emit.go @@ -7,4 +7,16 @@ import ( "context" ) -type Callback func(ctx context.Context, token []byte, attrs map[string]any) error +type Callback func(ctx context.Context, token Token) error + +type Token struct { + Body []byte + Attributes map[string]any +} + +func NewToken(body []byte, attrs map[string]any) Token { + return Token{ + Body: body, + Attributes: attrs, + } +} diff --git a/pkg/stanza/fileconsumer/internal/emittest/nop.go b/pkg/stanza/fileconsumer/internal/emittest/nop.go index 09c91b7949db..80e5c3e1a618 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/nop.go +++ b/pkg/stanza/fileconsumer/internal/emittest/nop.go @@ -5,8 +5,10 @@ package emittest // import "github.com/open-telemetry/opentelemetry-collector-co import ( "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" ) -func Nop(_ context.Context, _ []byte, _ map[string]any) error { +func Nop(_ context.Context, _ emit.Token) error { return nil } diff --git a/pkg/stanza/fileconsumer/internal/emittest/nop_test.go b/pkg/stanza/fileconsumer/internal/emittest/nop_test.go index 470327925fd4..40cef35312dc 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/nop_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/nop_test.go @@ -8,8 +8,10 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" ) func TestNop(t *testing.T) { - require.NoError(t, Nop(context.Background(), nil, nil)) + require.NoError(t, Nop(context.Background(), emit.Token{})) } diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink.go b/pkg/stanza/fileconsumer/internal/emittest/sink.go index 44d265c9af5d..6c7d9954b3fc 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink.go @@ -57,13 +57,13 @@ func NewSink(opts ...SinkOpt) *Sink { return &Sink{ emitChan: emitChan, timeout: cfg.timeout, - Callback: func(ctx context.Context, token []byte, attrs map[string]any) error { - copied := make([]byte, len(token)) - copy(copied, token) + Callback: func(ctx context.Context, token emit.Token) error { + copied := make([]byte, len(token.Body)) + copy(copied, token.Body) select { case <-ctx.Done(): return ctx.Err() - case emitChan <- &Call{copied, attrs}: + case emitChan <- &Call{copied, token.Attributes}: } return nil }, diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go index 2cea011e7518..7d340b4a79fd 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" ) func TestNextToken(t *testing.T) { @@ -202,7 +204,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) { } go func() { for _, c := range testCalls { - assert.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs)) + assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs))) } }() return s, testCalls diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 207e5b745dbd..3a591574fbc6 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -209,7 +209,7 @@ func (r *Reader) readContents(ctx context.Context) { r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum } - err = r.emitFunc(ctx, token, r.FileAttributes) + err = r.emitFunc(ctx, emit.NewToken(token, r.FileAttributes)) if err != nil { r.set.Logger.Error("failed to process token", zap.Error(err)) } diff --git a/pkg/stanza/operator/input/file/input.go b/pkg/stanza/operator/input/file/input.go index fd845499aeae..39f721ad57b3 100644 --- a/pkg/stanza/operator/input/file/input.go +++ b/pkg/stanza/operator/input/file/input.go @@ -11,6 +11,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) @@ -36,17 +37,17 @@ func (i *Input) Stop() error { return i.fileConsumer.Stop() } -func (i *Input) emit(ctx context.Context, token []byte, attrs map[string]any) error { - if len(token) == 0 { +func (i *Input) emit(ctx context.Context, token emit.Token) error { + if len(token.Body) == 0 { return nil } - ent, err := i.NewEntry(i.toBody(token)) + ent, err := i.NewEntry(i.toBody(token.Body)) if err != nil { return fmt.Errorf("create entry: %w", err) } - for k, v := range attrs { + for k, v := range token.Attributes { if err := ent.Set(entry.NewAttributeField(k), v); err != nil { i.Logger().Error("set attribute", zap.Error(err)) } diff --git a/receiver/otlpjsonfilereceiver/file.go b/receiver/otlpjsonfilereceiver/file.go index aad1ddef6561..c5a049dd271a 100644 --- a/receiver/otlpjsonfilereceiver/file.go +++ b/receiver/otlpjsonfilereceiver/file.go @@ -19,6 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver/internal/metadata" ) @@ -82,10 +83,10 @@ func createLogsReceiver(_ context.Context, settings receiver.Settings, configura if cfg.ReplayFile { opts = append(opts, fileconsumer.WithNoTracking()) } - input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token []byte, _ map[string]any) error { + input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { ctx = obsrecv.StartLogsOp(ctx) var l plog.Logs - l, err = logsUnmarshaler.UnmarshalLogs(token) + l, err = logsUnmarshaler.UnmarshalLogs(token.Body) if err != nil { obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, err) } else { @@ -119,10 +120,10 @@ func createMetricsReceiver(_ context.Context, settings receiver.Settings, config if cfg.ReplayFile { opts = append(opts, fileconsumer.WithNoTracking()) } - input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token []byte, _ map[string]any) error { + input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { ctx = obsrecv.StartMetricsOp(ctx) var m pmetric.Metrics - m, err = metricsUnmarshaler.UnmarshalMetrics(token) + m, err = metricsUnmarshaler.UnmarshalMetrics(token.Body) if err != nil { obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, err) } else { @@ -155,10 +156,10 @@ func createTracesReceiver(_ context.Context, settings receiver.Settings, configu if cfg.ReplayFile { opts = append(opts, fileconsumer.WithNoTracking()) } - input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token []byte, _ map[string]any) error { + input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { ctx = obsrecv.StartTracesOp(ctx) var t ptrace.Traces - t, err = tracesUnmarshaler.UnmarshalTraces(token) + t, err = tracesUnmarshaler.UnmarshalTraces(token.Body) if err != nil { obsrecv.EndTracesOp(ctx, metadata.Type.String(), 0, err) } else { @@ -183,8 +184,8 @@ func createProfilesReceiver(_ context.Context, settings receiver.Settings, confi if cfg.ReplayFile { opts = append(opts, fileconsumer.WithNoTracking()) } - input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token []byte, _ map[string]any) error { - p, _ := profilesUnmarshaler.UnmarshalProfiles(token) + input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { + p, _ := profilesUnmarshaler.UnmarshalProfiles(token.Body) if p.ResourceProfiles().Len() != 0 { _ = profiles.ConsumeProfiles(ctx, p) }