diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index d27e8a584a..0abb2d049f 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -69,6 +69,8 @@ type Receiver struct { args Arguments + // The mutex is needed because the live debugging service can trigger an update + // concurrently to add/remove the live debugging consumer. updateMut sync.Mutex } @@ -128,8 +130,14 @@ func (r *Receiver) Run(ctx context.Context) error { // the underlying OpenTelemetry Collector receiver. func (r *Receiver) Update(args component.Arguments) error { r.updateMut.Lock() - defer r.updateMut.Unlock() r.args = args.(Arguments) + r.updateMut.Unlock() + return r.update() +} + +func (r *Receiver) update() error { + r.updateMut.Lock() + defer r.updateMut.Unlock() host := scheduler.NewHost( r.opts.Logger, @@ -241,5 +249,5 @@ func (r *Receiver) CurrentHealth() component.Health { } func (p *Receiver) LiveDebugging(_ int) { - p.Update(p.args) + p.update() } diff --git a/internal/component/otelcol/receiver/receiver_test.go b/internal/component/otelcol/receiver/receiver_test.go index 29d263c9a4..19ce33c53a 100644 --- a/internal/component/otelcol/receiver/receiver_test.go +++ b/internal/component/otelcol/receiver/receiver_test.go @@ -127,6 +127,31 @@ func TestReceiverUpdate(t *testing.T) { require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded") } +// This test will trigger a race error if the update function is not properly protected with a mutex +// because the live debugging service can call the update function concurrently. +func TestReceiverUpdateLiveDebugging(t *testing.T) { + te := newTestEnvironment(t, func(t otelconsumer.Traces) {}) + te.Start(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{}, + }) + + require.NoError(t, te.Controller.WaitRunning(50*time.Millisecond)) + + go func() { + for i := 0; i < 100; i++ { + te.Controller.Update(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{}, + }) + } + }() + + for i := 0; i < 100; i++ { + cp, err := te.Controller.GetComponent() + require.NoError(t, err) + cp.(*receiver.Receiver).LiveDebugging(1) + } +} + type testEnvironment struct { t *testing.T