diff --git a/client/client.go b/client/client.go index d421e490..5e67027f 100644 --- a/client/client.go +++ b/client/client.go @@ -84,8 +84,13 @@ type OpAMPClient interface { // SetCustomMessage sets the custom message that will be sent to the Server. May be // called anytime after Start(), including from OnMessage handler. // - // If the CustomMessage is nil or it specifies a capability that is not listed in the - // CustomCapabilities provided in the StartSettings for the client, it will return an - // error. - SetCustomMessage(message *protobufs.CustomMessage) error + // If the CustomMessage is nil, ErrCustomMessageMissing will be returned. If the message + // specifies a capability that is not listed in the CustomCapabilities provided in the + // StartSettings for the client, ErrCustomCapabilityNotSupported will be returned. + // + // Only one message can be set at a time. If a message has already been set, it will + // return ErrCustomMessagePending. To ensure that it is safe to set another + // CustomMessage, the caller should wait for the channel to be closed before attempting + // to set another custom message. + SetCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) } diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 9ffbe5e4..bffee334 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -1519,7 +1519,7 @@ func TestReportCustomCapabilities(t *testing.T) { // Client ---> // Send a custom message to the server - _ = client.SetCustomMessage(clientEchoRequest) + _, _ = client.SetCustomMessage(clientEchoRequest) // ---> Server srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { @@ -1586,14 +1586,14 @@ func TestSetCustomMessage(t *testing.T) { { name: "nil message is error", message: nil, - expectedError: internal.ErrCustomMessageMissing, + expectedError: types.ErrCustomMessageMissing, }, { name: "unsupported message is error", message: &protobufs.CustomMessage{ Capability: "io.opentelemetry.not-supported", }, - expectedError: internal.ErrCustomCapabilityNotSupported, + expectedError: types.ErrCustomCapabilityNotSupported, }, { name: "supported capability is ok", @@ -1606,7 +1606,8 @@ func TestSetCustomMessage(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - assert.ErrorIs(t, client.SetCustomMessage(test.message), test.expectedError) + _, err := client.SetCustomMessage(test.message) + assert.ErrorIs(t, err, test.expectedError) }) } }) @@ -1641,7 +1642,8 @@ func TestCustomMessages(t *testing.T) { Type: "hello", Data: []byte("test message 1"), } - assert.NoError(t, client.SetCustomMessage(customMessage1)) + _, err := client.SetCustomMessage(customMessage1) + assert.NoError(t, err) // Verify message 1 delivered eventually( @@ -1661,7 +1663,8 @@ func TestCustomMessages(t *testing.T) { Type: "hello", Data: []byte("test message 2"), } - assert.NoError(t, client.SetCustomMessage(customMessage2)) + _, err = client.SetCustomMessage(customMessage2) + assert.NoError(t, err) // Verify message 2 delivered eventually( @@ -1679,7 +1682,80 @@ func TestCustomMessages(t *testing.T) { srv.Close() // Shutdown the client. - err := client.Stop(context.Background()) + err = client.Stop(context.Background()) assert.NoError(t, err) }) } + +func TestSetCustomMessageConflict(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + // Start a Server. + srv := internal.StartMockServer(t) + var rcvCustomMessage atomic.Value + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + if msg.CustomMessage != nil { + rcvCustomMessage.Store(msg.CustomMessage) + } + return nil + } + + // Start a client. + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + CustomCapabilities: []string{"local.test.example"}, + } + prepareClient(t, &settings, client) + + assert.NoError(t, client.Start(context.Background(), settings)) + + customMessage1 := &protobufs.CustomMessage{ + Capability: "local.test.example", + Type: "hello", + Data: []byte("test message 1"), + } + customMessage2 := &protobufs.CustomMessage{ + Capability: "local.test.example", + Type: "hello", + Data: []byte("test message 2"), + } + + _, err := client.SetCustomMessage(customMessage1) + assert.NoError(t, err) + + // Sending another message immediately should fail with ErrCustomMessagePending. + sendingChan, err := client.SetCustomMessage(customMessage2) + assert.ErrorIs(t, err, types.ErrCustomMessagePending) + assert.NotNil(t, sendingChan) + + // Receive the first custom message + eventually( + t, + func() bool { + msg, ok := rcvCustomMessage.Load().(*protobufs.CustomMessage) + if !ok || msg == nil { + return false + } + return proto.Equal(customMessage1, msg) + }, + ) + + // Wait for the sending channel to be closed. + <-sendingChan + + // Now sending the second message should work. + _, err = client.SetCustomMessage(customMessage2) + assert.NoError(t, err) + + // Receive the second custom message + eventually( + t, + func() bool { + msg, ok := rcvCustomMessage.Load().(*protobufs.CustomMessage) + if !ok || msg == nil { + return false + } + return proto.Equal(customMessage2, msg) + }, + ) + }) +} diff --git a/client/httpclient.go b/client/httpclient.go index 4a1b31d4..541da9a2 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -102,7 +102,7 @@ func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) err } // SetCustomMessage implements OpAMPClient.SetCustomMessage. -func (c *httpClient) SetCustomMessage(message *protobufs.CustomMessage) error { +func (c *httpClient) SetCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { return c.common.SetCustomMessage(message) } diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index c26e0d6d..ea406870 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -20,8 +20,6 @@ var ( ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set") ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set") ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set") - ErrCustomMessageMissing = errors.New("CustomMessage is nil") - ErrCustomCapabilityNotSupported = errors.New("CustomCapability of CustomMessage is not supported") errAlreadyStarted = errors.New("already started") errCannotStopNotStarted = errors.New("cannot stop because not started") @@ -384,19 +382,30 @@ func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.Custo } // SetCustomMessage sends the specified custom message to the server. -func (c *ClientCommon) SetCustomMessage(message *protobufs.CustomMessage) error { +func (c *ClientCommon) SetCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { if message == nil { - return ErrCustomMessageMissing + return nil, types.ErrCustomMessageMissing } if !c.ClientSyncedState.HasCustomCapability(message.Capability) { - return ErrCustomCapabilityNotSupported + return nil, types.ErrCustomCapabilityNotSupported } - c.sender.NextMessage().Update( + hasCustomMessage := false + sendingChan := c.sender.NextMessage().Update( func(msg *protobufs.AgentToServer) { - msg.CustomMessage = message + if msg.CustomMessage != nil { + hasCustomMessage = true + } else { + msg.CustomMessage = message + } }, ) + + if hasCustomMessage { + return sendingChan, types.ErrCustomMessagePending + } + c.sender.ScheduleSend() - return nil + + return sendingChan, nil } diff --git a/client/internal/nextmessage.go b/client/internal/nextmessage.go index e79c6780..f9a2099d 100644 --- a/client/internal/nextmessage.go +++ b/client/internal/nextmessage.go @@ -12,26 +12,31 @@ import ( type NextMessage struct { // The next message to send. nextMessage *protobufs.AgentToServer + // nextMessageSending is a channel that is closed when the message is sent. + nextMessageSending chan struct{} // Indicates that nextMessage is pending to be sent. messagePending bool - // Mutex to protect the above 2 fields. + // Mutex to protect the above 3 fields. messageMutex sync.Mutex } // NewNextMessage returns a new empty NextMessage. func NewNextMessage() NextMessage { return NextMessage{ - nextMessage: &protobufs.AgentToServer{}, + nextMessage: &protobufs.AgentToServer{}, + nextMessageSending: make(chan struct{}), } } // Update applies the specified modifier function to the next message that // will be sent and marks the message as pending to be sent. -func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) { +func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) (messageSendingChannel chan struct{}) { s.messageMutex.Lock() modifier(s.nextMessage) s.messagePending = true + sending := s.nextMessageSending s.messageMutex.Unlock() + return sending } // PopPending returns the next message to be sent, if it is pending or nil otherwise. @@ -54,7 +59,13 @@ func (s *NextMessage) PopPending() *protobufs.AgentToServer { Capabilities: s.nextMessage.Capabilities, } + sending := s.nextMessageSending + s.nextMessage = msg + s.nextMessageSending = make(chan struct{}) + + // Notify that the message is being sent and a new nextMessage has been created. + close(sending) } s.messageMutex.Unlock() return msgToSend diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 3442edd7..4c3d23a8 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -67,7 +67,8 @@ func (s *WSSender) sendNextMessage() error { msgToSend := s.nextMessage.PopPending() if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) { // There is a pending message and the message has some fields populated. - return s.sendMessage(msgToSend) + err := s.sendMessage(msgToSend) + return err } return nil } diff --git a/client/types/errors.go b/client/types/errors.go new file mode 100644 index 00000000..1a92cb27 --- /dev/null +++ b/client/types/errors.go @@ -0,0 +1,16 @@ +package types + +import "errors" + +var ( + // ErrCustomMessageMissing is returned by SetCustomMessage when called with a nil message. + ErrCustomMessageMissing = errors.New("CustomMessage is nil") + + // ErrCustomCapabilityNotSupported is returned by SetCustomMessage when called with + // message that has a capability that is not specified as supported by the client. + ErrCustomCapabilityNotSupported = errors.New("CustomCapability of CustomMessage is not supported") + + // ErrCustomMessagePending is returned by SetCustomMessage when called before the previous + // message has been sent. + ErrCustomMessagePending = errors.New("custom message already set") +) diff --git a/client/wsclient.go b/client/wsclient.go index 8a3289c6..780115be 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -116,7 +116,7 @@ func (c *wsClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error return c.common.SetPackageStatuses(statuses) } -func (c *wsClient) SetCustomMessage(message *protobufs.CustomMessage) error { +func (c *wsClient) SetCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { return c.common.SetCustomMessage(message) }