Skip to content

Commit

Permalink
ReceiverLoop should stop on cancelling context (#240)
Browse files Browse the repository at this point in the history
Here is test run that shows it doesn't stop without fix https://github.com/open-telemetry/opamp-go/actions/runs/7512286887/job/20452817400

Fixes #239
  • Loading branch information
srikanthccv authored Jan 23, 2024
1 parent c8cc40b commit a669c09
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 11 deletions.
38 changes: 27 additions & 11 deletions client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,38 @@ func NewWSReceiver(

// ReceiverLoop runs the receiver loop. To stop the receiver cancel the context.
func (r *wsReceiver) ReceiverLoop(ctx context.Context) {
runContext, cancelFunc := context.WithCancel(ctx)
type receivedMessage struct {
message *protobufs.ServerToAgent
err error
}

out:
for {
var message protobufs.ServerToAgent
if err := r.receiveMessage(&message); err != nil {
if ctx.Err() == nil && !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
r.logger.Errorf(ctx, "Unexpected error while receiving: %v", err)
select {
case <-ctx.Done():
return
default:
result := make(chan receivedMessage, 1)

go func() {
var message protobufs.ServerToAgent
err := r.receiveMessage(&message)
result <- receivedMessage{&message, err}
}()

select {
case <-ctx.Done():
return
case res := <-result:
if res.err != nil {
if !websocket.IsCloseError(res.err, websocket.CloseNormalClosure) {
r.logger.Errorf(ctx, "Unexpected error while receiving: %v", res.err)
}
return
}
r.processor.ProcessReceivedMessage(ctx, res.message)
}
break out
} else {
r.processor.ProcessReceivedMessage(runContext, &message)
}
}

cancelFunc()
}

func (r *wsReceiver) receiveMessage(msg *protobufs.ServerToAgent) error {
Expand Down
36 changes: 36 additions & 0 deletions client/internal/wsreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package internal
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -181,3 +184,36 @@ func TestDecodeMessage(t *testing.T) {
}
}
}

func TestReceiverLoopStop(t *testing.T) {

srv := StartMockServer(t)

conn, _, err := websocket.DefaultDialer.DialContext(
context.Background(),
"ws://"+srv.Endpoint,
nil,
)
require.NoError(t, err)

var receiverLoopStopped atomic.Bool

callbacks := types.CallbacksStruct{}
clientSyncedState := ClientSyncedState{
remoteConfigStatus: &protobufs.RemoteConfigStatus{},
}
sender := WSSender{}
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand
receiver := NewWSReceiver(TestLogger{t}, callbacks, conn, &sender, &clientSyncedState, nil, capabilities)
ctx, cancel := context.WithCancel(context.Background())

go func() {
receiver.ReceiverLoop(ctx)
receiverLoopStopped.Store(true)
}()
cancel()

assert.Eventually(t, func() bool {
return receiverLoopStopped.Load()
}, 2*time.Second, 100*time.Millisecond, "ReceiverLoop should stop when context is cancelled")
}

0 comments on commit a669c09

Please sign in to comment.