From 880b9c7404b874e1c52870908fb79e7f37ea3dff Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Tue, 5 Nov 2024 18:07:50 +0100 Subject: [PATCH] refactor: introduce emit.Token struct This makes the code clearer by encapsulating the token's body and attributes in a single structure. It should make future change clearer when the emit callback will be changed to accept a collection of tokens as opposed to a single token. The Sink type could use some refactoring as well, but I'm not doing it here to keep the changes to the minimum for clarity and ease of code review. --- pkg/stanza/fileconsumer/benchmark_test.go | 5 +++-- pkg/stanza/fileconsumer/emit/emit.go | 14 +++++++++++++- pkg/stanza/fileconsumer/internal/emittest/nop.go | 4 +++- .../fileconsumer/internal/emittest/nop_test.go | 3 ++- pkg/stanza/fileconsumer/internal/emittest/sink.go | 8 ++++---- .../fileconsumer/internal/emittest/sink_test.go | 3 ++- pkg/stanza/fileconsumer/internal/reader/reader.go | 2 +- pkg/stanza/operator/input/file/input.go | 9 +++++---- 8 files changed, 33 insertions(+), 15 deletions(-) diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index c470fa50e17c..20ecfc980f15 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" @@ -187,8 +188,8 @@ func BenchmarkFileInput(b *testing.B) { cfg.PollInterval = time.Microsecond doneChan := make(chan bool, len(files)) - callback := func(_ context.Context, token []byte, _ map[string]any) error { - if len(token) == 0 { + callback := func(_ context.Context, token emit.Token) error { + if len(token.Body) == 0 { doneChan <- true } return nil diff --git a/pkg/stanza/fileconsumer/emit/emit.go b/pkg/stanza/fileconsumer/emit/emit.go index 65859444229e..6c27a3c72a4a 100644 --- a/pkg/stanza/fileconsumer/emit/emit.go +++ b/pkg/stanza/fileconsumer/emit/emit.go @@ -7,4 +7,16 @@ import ( "context" ) -type Callback func(ctx context.Context, token []byte, attrs map[string]any) error +type Callback func(ctx context.Context, token Token) error + +type Token struct { + Body []byte + Attributes map[string]any +} + +func NewToken(body []byte, attrs map[string]any) Token { + return Token{ + Body: body, + Attributes: attrs, + } +} diff --git a/pkg/stanza/fileconsumer/internal/emittest/nop.go b/pkg/stanza/fileconsumer/internal/emittest/nop.go index 09c91b7949db..80e5c3e1a618 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/nop.go +++ b/pkg/stanza/fileconsumer/internal/emittest/nop.go @@ -5,8 +5,10 @@ package emittest // import "github.com/open-telemetry/opentelemetry-collector-co import ( "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" ) -func Nop(_ context.Context, _ []byte, _ map[string]any) error { +func Nop(_ context.Context, _ emit.Token) error { return nil } diff --git a/pkg/stanza/fileconsumer/internal/emittest/nop_test.go b/pkg/stanza/fileconsumer/internal/emittest/nop_test.go index 470327925fd4..51cf08e89257 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/nop_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/nop_test.go @@ -7,9 +7,10 @@ import ( "context" "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/stretchr/testify/require" ) func TestNop(t *testing.T) { - require.NoError(t, Nop(context.Background(), nil, nil)) + require.NoError(t, Nop(context.Background(), emit.Token{})) } diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink.go b/pkg/stanza/fileconsumer/internal/emittest/sink.go index 44d265c9af5d..6c7d9954b3fc 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink.go @@ -57,13 +57,13 @@ func NewSink(opts ...SinkOpt) *Sink { return &Sink{ emitChan: emitChan, timeout: cfg.timeout, - Callback: func(ctx context.Context, token []byte, attrs map[string]any) error { - copied := make([]byte, len(token)) - copy(copied, token) + Callback: func(ctx context.Context, token emit.Token) error { + copied := make([]byte, len(token.Body)) + copy(copied, token.Body) select { case <-ctx.Done(): return ctx.Err() - case emitChan <- &Call{copied, attrs}: + case emitChan <- &Call{copied, token.Attributes}: } return nil }, diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go index 2cea011e7518..9f753351ab3e 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -202,7 +203,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) { } go func() { for _, c := range testCalls { - assert.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs)) + assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs))) } }() return s, testCalls diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 207e5b745dbd..3a591574fbc6 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -209,7 +209,7 @@ func (r *Reader) readContents(ctx context.Context) { r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum } - err = r.emitFunc(ctx, token, r.FileAttributes) + err = r.emitFunc(ctx, emit.NewToken(token, r.FileAttributes)) if err != nil { r.set.Logger.Error("failed to process token", zap.Error(err)) } diff --git a/pkg/stanza/operator/input/file/input.go b/pkg/stanza/operator/input/file/input.go index fd845499aeae..39f721ad57b3 100644 --- a/pkg/stanza/operator/input/file/input.go +++ b/pkg/stanza/operator/input/file/input.go @@ -11,6 +11,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) @@ -36,17 +37,17 @@ func (i *Input) Stop() error { return i.fileConsumer.Stop() } -func (i *Input) emit(ctx context.Context, token []byte, attrs map[string]any) error { - if len(token) == 0 { +func (i *Input) emit(ctx context.Context, token emit.Token) error { + if len(token.Body) == 0 { return nil } - ent, err := i.NewEntry(i.toBody(token)) + ent, err := i.NewEntry(i.toBody(token.Body)) if err != nil { return fmt.Errorf("create entry: %w", err) } - for k, v := range attrs { + for k, v := range token.Attributes { if err := ent.Set(entry.NewAttributeField(k), v); err != nil { i.Logger().Error("set attribute", zap.Error(err)) }