Skip to content

Commit

Permalink
fix race condition in otel receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Feb 12, 2025
1 parent 4cf6bc5 commit 0dd571d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
12 changes: 10 additions & 2 deletions internal/component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type Receiver struct {

args Arguments

// The mutex is needed because the live debugging service can trigger an update
// concurrently to add/remove the live debugging consumer.
updateMut sync.Mutex
}

Expand Down Expand Up @@ -128,8 +130,14 @@ func (r *Receiver) Run(ctx context.Context) error {
// the underlying OpenTelemetry Collector receiver.
func (r *Receiver) Update(args component.Arguments) error {
r.updateMut.Lock()
defer r.updateMut.Unlock()
r.args = args.(Arguments)
r.updateMut.Unlock()
return r.update()
}

func (r *Receiver) update() error {
r.updateMut.Lock()
defer r.updateMut.Unlock()

host := scheduler.NewHost(
r.opts.Logger,
Expand Down Expand Up @@ -241,5 +249,5 @@ func (r *Receiver) CurrentHealth() component.Health {
}

func (p *Receiver) LiveDebugging(_ int) {
p.Update(p.args)
p.update()
}
25 changes: 25 additions & 0 deletions internal/component/otelcol/receiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,31 @@ func TestReceiverUpdate(t *testing.T) {
require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded")
}

// This test will trigger a race error if the update function is not properly protected with a mutex
// because the live debugging service can call the update function concurrently.
func TestReceiverUpdateLiveDebugging(t *testing.T) {
te := newTestEnvironment(t, func(t otelconsumer.Traces) {})
te.Start(fakeReceiverArgs{
Output: &otelcol.ConsumerArguments{},
})

require.NoError(t, te.Controller.WaitRunning(50*time.Millisecond))

go func() {
for i := 0; i < 100; i++ {
te.Controller.Update(fakeReceiverArgs{
Output: &otelcol.ConsumerArguments{},
})
}
}()

for i := 0; i < 100; i++ {
cp, err := te.Controller.GetComponent()
require.NoError(t, err)
cp.(*receiver.Receiver).LiveDebugging(1)
}
}

type testEnvironment struct {
t *testing.T

Expand Down

0 comments on commit 0dd571d

Please sign in to comment.