From be86a6ff35e9cd9b2ec76b936c5e935059e49f4d Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Wed, 22 Jan 2025 09:42:28 -0500 Subject: [PATCH] Revert "Remove SetAvailableComponents method from interface" This reverts commit 9e2acf3bce37f30f3ac12eeeeb105ff0be5dc98f. --- client/client.go | 14 ++++ client/clientimpl_test.go | 103 +++++++++++++++++++++++++++ client/httpclient.go | 5 ++ client/internal/clientcommon.go | 40 +++++++++++ client/internal/receivedprocessor.go | 4 +- client/types/callbacks.go | 3 + client/types/errors.go | 9 +++ client/wsclient.go | 5 ++ 8 files changed, 182 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index afc08315..c3d2ca3f 100644 --- a/client/client.go +++ b/client/client.go @@ -134,4 +134,18 @@ type OpAMPClient interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) + + // SetAvailableComponents modifies the set of components that are available for configuration + // on the agent. + // May be called any time after Start(), including from the OnMessage handler. + // The new components will be sent with the next message to the server. + // + // If components is nil, types.ErrAvailableComponentsMissing will be returned. + // If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned. + // If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(), + // types.ErrReportsAvailableComponentsNotSet will be returned. + // + // This method is subject to agent status compression - if components is not + // different from the cached agent state, this method is a no-op. + SetAvailableComponents(components *protobufs.AvailableComponents) error } diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 2bab9b5b..bf2ce7b7 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2306,6 +2306,109 @@ func TestSetFlagsBeforeStart(t *testing.T) { }) } +func TestSetAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + capabilities protobufs.AgentCapabilities + testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer) + }{ + { + desc: "apply nil AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing) + }, + }, + { + desc: "apply AvailableComponents with empty hash", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash) + }, + }, + { + desc: "apply AvailableComponents without required capability", + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet) + }, + }, + { + desc: "apply AvailableComponents with cached AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents())) + }, + }, + { + desc: "apply AvailableComponents with new AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) { + availableComponents := generateTestAvailableComponents() + availableComponents.Hash = []byte("different") + require.NoError(t, client.SetAvailableComponents(availableComponents)) + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, 1, msg.SequenceNum) + msgAvailableComponents := msg.GetAvailableComponents() + require.NotNil(t, msgAvailableComponents) + require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) + require.Nil(t, msgAvailableComponents.GetComponents()) + return nil + }) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + + // Start a Server. + srv := internal.StartMockServer(t) + srv.EnableExpectMode() + + availableComponents := generateTestAvailableComponents() + + // Start a client. + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) {}, + }, + Capabilities: tc.capabilities, + AvailableComponents: availableComponents, + } + prepareClient(t, &settings, client) + + // Client ---> + assert.NoError(t, client.Start(context.Background(), settings)) + + // ---> Server + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, 0, msg.SequenceNum) + msgAvailableComponents := msg.GetAvailableComponents() + if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + require.NotNil(t, msgAvailableComponents) + require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) + require.Nil(t, msgAvailableComponents.GetComponents()) + } else { + require.Nil(t, msgAvailableComponents) + } + return nil + }) + + tc.testFunc(t, client, srv) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + }) + } +} + func generateTestAvailableComponents() *protobufs.AvailableComponents { return &protobufs.AvailableComponents{ Hash: []byte("fake-hash"), diff --git a/client/httpclient.go b/client/httpclient.go index 92259d59..8d32797d 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func (c *httpClient) runUntilStopped(ctx context.Context) { // Start the HTTP sender. This will make request/responses with retries for // failures and will wait with configured polling interval if there is nothing diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 1fd2b969..2a2e9140 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -454,3 +454,43 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess return sendingChan, nil } + +// SetAvailableComponents sends a message to the server with the available components for the agent +func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { + return types.ErrReportsAvailableComponentsNotSet + } + + if components == nil { + return types.ErrAvailableComponentsMissing + } + + if len(components.Hash) == 0 { + return types.ErrNoAvailableComponentHash + } + + // implement agent status compression, don't send the message if it hasn't changed from the previous message + availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components) + + if availableComponentsChanged { + if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil { + return err + } + + // initially, do not send the full component state - just send the hash. + // full state is available on request from the server using the corresponding ServerToAgent flag + availableComponents := &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } + + c.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = availableComponents + }, + ) + + c.sender.ScheduleSend() + } + + return nil +} diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 55f2ee55..7a5474c7 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -75,7 +75,9 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro r.logger.Errorf(ctx, "cannot processed received flags:%v", err) } - msgData := &types.MessageData{} + msgData := &types.MessageData{ + Flags: protobufs.ServerToAgentFlags(msg.Flags), + } if msg.RemoteConfig != nil { if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) { diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 48d5f832..0b1a9a23 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -45,6 +45,9 @@ type MessageData struct { // CustomMessage contains a custom message sent by the server. CustomMessage *protobufs.CustomMessage + + // Flags contains any flags sent by the server. + Flags protobufs.ServerToAgentFlags } // Callbacks contains functions that are executed when the client encounters diff --git a/client/types/errors.go b/client/types/errors.go index aab66924..a1d1389b 100644 --- a/client/types/errors.go +++ b/client/types/errors.go @@ -13,4 +13,13 @@ var ( // ErrCustomMessagePending is returned by SendCustomMessage when called before the previous // message has been sent. ErrCustomMessagePending = errors.New("custom message already set") + + // ErrReportsAvailableComponentsNotSet is returned by SetAvailableComponents without the ReportsAvailableComponents capability set + ErrReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") + + // ErrAvailableComponentsMissing is returned by SetAvailableComponents when called with a nil message + ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") + + // ErrNoAvailableComponentHash is returned by SetAvailableComponents when called with a message with an empty hash + ErrNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) diff --git a/client/wsclient.go b/client/wsclient.go index f19d8ab4..fdacdd4b 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -157,6 +157,11 @@ func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageS return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func viaReq(resps []*http.Response) []*http.Request { reqs := make([]*http.Request, 0, len(resps)) for _, resp := range resps {