diff --git a/.chloggen/refactor-callback-token.yaml b/.chloggen/refactor-callback-token.yaml new file mode 100644 index 000000000000..a7d36715a111 --- /dev/null +++ b/.chloggen/refactor-callback-token.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Changed signature of `emit.Callback` function in `pkg/stanza/fileconsumer/emit` package by introducing `emit.Token` struct that encapsulates the token's body and attributes. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36260] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index c470fa50e17c..20ecfc980f15 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" @@ -187,8 +188,8 @@ func BenchmarkFileInput(b *testing.B) { cfg.PollInterval = time.Microsecond doneChan := make(chan bool, len(files)) - callback := func(_ context.Context, token []byte, _ map[string]any) error { - if len(token) == 0 { + callback := func(_ context.Context, token emit.Token) error { + if len(token.Body) == 0 { doneChan <- true } return nil diff --git a/pkg/stanza/fileconsumer/emit/emit.go b/pkg/stanza/fileconsumer/emit/emit.go index 65859444229e..6c27a3c72a4a 100644 --- a/pkg/stanza/fileconsumer/emit/emit.go +++ b/pkg/stanza/fileconsumer/emit/emit.go @@ -7,4 +7,16 @@ import ( "context" ) -type Callback func(ctx context.Context, token []byte, attrs map[string]any) error +type Callback func(ctx context.Context, token Token) error + +type Token struct { + Body []byte + Attributes map[string]any +} + +func NewToken(body []byte, attrs map[string]any) Token { + return Token{ + Body: body, + Attributes: attrs, + } +} diff --git a/pkg/stanza/fileconsumer/internal/emittest/nop.go b/pkg/stanza/fileconsumer/internal/emittest/nop.go index 09c91b7949db..80e5c3e1a618 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/nop.go +++ b/pkg/stanza/fileconsumer/internal/emittest/nop.go @@ -5,8 +5,10 @@ package emittest // import "github.com/open-telemetry/opentelemetry-collector-co import ( "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" ) -func Nop(_ context.Context, _ []byte, _ map[string]any) error { +func Nop(_ context.Context, _ emit.Token) error { return nil } diff --git a/pkg/stanza/fileconsumer/internal/emittest/nop_test.go b/pkg/stanza/fileconsumer/internal/emittest/nop_test.go index 470327925fd4..40cef35312dc 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/nop_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/nop_test.go @@ -8,8 +8,10 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" ) func TestNop(t *testing.T) { - require.NoError(t, Nop(context.Background(), nil, nil)) + require.NoError(t, Nop(context.Background(), emit.Token{})) } diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink.go b/pkg/stanza/fileconsumer/internal/emittest/sink.go index 44d265c9af5d..6c7d9954b3fc 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink.go @@ -57,13 +57,13 @@ func NewSink(opts ...SinkOpt) *Sink { return &Sink{ emitChan: emitChan, timeout: cfg.timeout, - Callback: func(ctx context.Context, token []byte, attrs map[string]any) error { - copied := make([]byte, len(token)) - copy(copied, token) + Callback: func(ctx context.Context, token emit.Token) error { + copied := make([]byte, len(token.Body)) + copy(copied, token.Body) select { case <-ctx.Done(): return ctx.Err() - case emitChan <- &Call{copied, attrs}: + case emitChan <- &Call{copied, token.Attributes}: } return nil }, diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go index 2cea011e7518..7d340b4a79fd 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" ) func TestNextToken(t *testing.T) { @@ -202,7 +204,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) { } go func() { for _, c := range testCalls { - assert.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs)) + assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs))) } }() return s, testCalls diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 207e5b745dbd..3a591574fbc6 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -209,7 +209,7 @@ func (r *Reader) readContents(ctx context.Context) { r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum } - err = r.emitFunc(ctx, token, r.FileAttributes) + err = r.emitFunc(ctx, emit.NewToken(token, r.FileAttributes)) if err != nil { r.set.Logger.Error("failed to process token", zap.Error(err)) } diff --git a/pkg/stanza/operator/input/file/input.go b/pkg/stanza/operator/input/file/input.go index fd845499aeae..39f721ad57b3 100644 --- a/pkg/stanza/operator/input/file/input.go +++ b/pkg/stanza/operator/input/file/input.go @@ -11,6 +11,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) @@ -36,17 +37,17 @@ func (i *Input) Stop() error { return i.fileConsumer.Stop() } -func (i *Input) emit(ctx context.Context, token []byte, attrs map[string]any) error { - if len(token) == 0 { +func (i *Input) emit(ctx context.Context, token emit.Token) error { + if len(token.Body) == 0 { return nil } - ent, err := i.NewEntry(i.toBody(token)) + ent, err := i.NewEntry(i.toBody(token.Body)) if err != nil { return fmt.Errorf("create entry: %w", err) } - for k, v := range attrs { + for k, v := range token.Attributes { if err := ent.Set(entry.NewAttributeField(k), v); err != nil { i.Logger().Error("set attribute", zap.Error(err)) } diff --git a/receiver/otlpjsonfilereceiver/file.go b/receiver/otlpjsonfilereceiver/file.go index aad1ddef6561..c5a049dd271a 100644 --- a/receiver/otlpjsonfilereceiver/file.go +++ b/receiver/otlpjsonfilereceiver/file.go @@ -19,6 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver/internal/metadata" ) @@ -82,10 +83,10 @@ func createLogsReceiver(_ context.Context, settings receiver.Settings, configura if cfg.ReplayFile { opts = append(opts, fileconsumer.WithNoTracking()) } - input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token []byte, _ map[string]any) error { + input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { ctx = obsrecv.StartLogsOp(ctx) var l plog.Logs - l, err = logsUnmarshaler.UnmarshalLogs(token) + l, err = logsUnmarshaler.UnmarshalLogs(token.Body) if err != nil { obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, err) } else { @@ -119,10 +120,10 @@ func createMetricsReceiver(_ context.Context, settings receiver.Settings, config if cfg.ReplayFile { opts = append(opts, fileconsumer.WithNoTracking()) } - input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token []byte, _ map[string]any) error { + input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { ctx = obsrecv.StartMetricsOp(ctx) var m pmetric.Metrics - m, err = metricsUnmarshaler.UnmarshalMetrics(token) + m, err = metricsUnmarshaler.UnmarshalMetrics(token.Body) if err != nil { obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, err) } else { @@ -155,10 +156,10 @@ func createTracesReceiver(_ context.Context, settings receiver.Settings, configu if cfg.ReplayFile { opts = append(opts, fileconsumer.WithNoTracking()) } - input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token []byte, _ map[string]any) error { + input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { ctx = obsrecv.StartTracesOp(ctx) var t ptrace.Traces - t, err = tracesUnmarshaler.UnmarshalTraces(token) + t, err = tracesUnmarshaler.UnmarshalTraces(token.Body) if err != nil { obsrecv.EndTracesOp(ctx, metadata.Type.String(), 0, err) } else { @@ -183,8 +184,8 @@ func createProfilesReceiver(_ context.Context, settings receiver.Settings, confi if cfg.ReplayFile { opts = append(opts, fileconsumer.WithNoTracking()) } - input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token []byte, _ map[string]any) error { - p, _ := profilesUnmarshaler.UnmarshalProfiles(token) + input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { + p, _ := profilesUnmarshaler.UnmarshalProfiles(token.Body) if p.ResourceProfiles().Len() != 0 { _ = profiles.ConsumeProfiles(ctx, p) }