Skip to content

Commit

Permalink
refactor: introduce emit.Token struct
Browse files Browse the repository at this point in the history
This makes the code clearer by encapsulating the token's body and attributes in a single structure.
It should make future change clearer when the emit callback will be changed to accept a collection of tokens as opposed to a single token.
The Sink type could use some refactoring as well,
but I'm not doing it here to keep the changes to the minimum
for clarity and ease of code review.
  • Loading branch information
andrzej-stencel committed Nov 7, 2024
1 parent c840e69 commit 880b9c7
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 15 deletions.
5 changes: 3 additions & 2 deletions pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
Expand Down Expand Up @@ -187,8 +188,8 @@ func BenchmarkFileInput(b *testing.B) {
cfg.PollInterval = time.Microsecond

doneChan := make(chan bool, len(files))
callback := func(_ context.Context, token []byte, _ map[string]any) error {
if len(token) == 0 {
callback := func(_ context.Context, token emit.Token) error {
if len(token.Body) == 0 {
doneChan <- true
}
return nil
Expand Down
14 changes: 13 additions & 1 deletion pkg/stanza/fileconsumer/emit/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,16 @@ import (
"context"
)

type Callback func(ctx context.Context, token []byte, attrs map[string]any) error
type Callback func(ctx context.Context, token Token) error

type Token struct {
Body []byte
Attributes map[string]any
}

func NewToken(body []byte, attrs map[string]any) Token {
return Token{
Body: body,
Attributes: attrs,
}
}
4 changes: 3 additions & 1 deletion pkg/stanza/fileconsumer/internal/emittest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package emittest // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
)

func Nop(_ context.Context, _ []byte, _ map[string]any) error {
func Nop(_ context.Context, _ emit.Token) error {
return nil
}
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/internal/emittest/nop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/stretchr/testify/require"
)

func TestNop(t *testing.T) {
require.NoError(t, Nop(context.Background(), nil, nil))
require.NoError(t, Nop(context.Background(), emit.Token{}))
}
8 changes: 4 additions & 4 deletions pkg/stanza/fileconsumer/internal/emittest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func NewSink(opts ...SinkOpt) *Sink {
return &Sink{
emitChan: emitChan,
timeout: cfg.timeout,
Callback: func(ctx context.Context, token []byte, attrs map[string]any) error {
copied := make([]byte, len(token))
copy(copied, token)
Callback: func(ctx context.Context, token emit.Token) error {
copied := make([]byte, len(token.Body))
copy(copied, token.Body)
select {
case <-ctx.Done():
return ctx.Err()
case emitChan <- &Call{copied, attrs}:
case emitChan <- &Call{copied, token.Attributes}:
}
return nil
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/internal/emittest/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -202,7 +203,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) {
}
go func() {
for _, c := range testCalls {
assert.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs))
assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs)))
}
}()
return s, testCalls
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (r *Reader) readContents(ctx context.Context) {
r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum
}

err = r.emitFunc(ctx, token, r.FileAttributes)
err = r.emitFunc(ctx, emit.NewToken(token, r.FileAttributes))
if err != nil {
r.set.Logger.Error("failed to process token", zap.Error(err))
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/stanza/operator/input/file/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)
Expand All @@ -36,17 +37,17 @@ func (i *Input) Stop() error {
return i.fileConsumer.Stop()
}

func (i *Input) emit(ctx context.Context, token []byte, attrs map[string]any) error {
if len(token) == 0 {
func (i *Input) emit(ctx context.Context, token emit.Token) error {
if len(token.Body) == 0 {
return nil
}

ent, err := i.NewEntry(i.toBody(token))
ent, err := i.NewEntry(i.toBody(token.Body))
if err != nil {
return fmt.Errorf("create entry: %w", err)
}

for k, v := range attrs {
for k, v := range token.Attributes {
if err := ent.Set(entry.NewAttributeField(k), v); err != nil {
i.Logger().Error("set attribute", zap.Error(err))
}
Expand Down

0 comments on commit 880b9c7

Please sign in to comment.