From f6b505848f501266e646b6935a8904cc4873bfce Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Fri, 10 Jan 2025 12:01:35 -0500 Subject: [PATCH 01/13] Add implementation of new AvailableComponents message --- client/client.go | 5 +++ client/httpclient.go | 5 +++ client/internal/clientcommon.go | 49 ++++++++++++++++++++++++++-- client/internal/clientstate.go | 39 +++++++++++++++++----- client/internal/receivedprocessor.go | 10 ++++++ client/types/callbacks.go | 3 ++ client/wsclient.go | 5 +++ client/wsclient_test.go | 2 +- 8 files changed, 106 insertions(+), 12 deletions(-) diff --git a/client/client.go b/client/client.go index afc08315..5917bd3a 100644 --- a/client/client.go +++ b/client/client.go @@ -134,4 +134,9 @@ type OpAMPClient interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) + + // SetAvailableComponents modifies the set of components that are available for configuration + // on the agent. + // TODO more documentation + SetAvailableComponents(components *protobufs.AvailableComponents) error } diff --git a/client/httpclient.go b/client/httpclient.go index 92259d59..8d32797d 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func (c *httpClient) runUntilStopped(ctx context.Context) { // Start the HTTP sender. This will make request/responses with retries for // failures and will wait with configured polling interval if there is nothing diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 283e3b01..47ed75ce 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -20,10 +20,12 @@ var ( ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set") ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set") ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set") + ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") - errAlreadyStarted = errors.New("already started") - errCannotStopNotStarted = errors.New("cannot stop because not started") - errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") + errAlreadyStarted = errors.New("already started") + errCannotStopNotStarted = errors.New("cannot stop because not started") + errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") + errReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") ) // ClientCommon contains the OpAMP logic that is common between WebSocket and @@ -88,6 +90,10 @@ func (c *ClientCommon) PrepareStart( return ErrHealthMissing } + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil { + return ErrAvailableComponentsMissing + } + // Prepare remote config status. if settings.RemoteConfigStatus == nil { // RemoteConfigStatus is not provided. Start with empty. @@ -212,6 +218,12 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { return err } + // initially, do not send the full component state - just send the hash. + // full state is available on request from the server using the corresponding ServerToAgent flag + availableComponents := &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } + c.sender.NextMessage().Update( func(msg *protobufs.AgentToServer) { msg.AgentDescription = c.ClientSyncedState.AgentDescription() @@ -221,6 +233,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { msg.Capabilities = uint64(c.Capabilities) msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities() msg.Flags = c.ClientSyncedState.Flags() + msg.AvailableComponents = availableComponents }, ) return nil @@ -433,3 +446,33 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess return sendingChan, nil } + +// SetAvailableComponents sends a message to the server with the available components for the agent +func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { + return errReportsAvailableComponentsNotSet + } + + if len(components.Hash) == 0 { + return errNoAvailableComponentHash + } + + // implement agent status compression, don't send the message if it hasn't changed from the previous message + availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components) + + if availableComponentsChanged { + if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil { + return err + } + + c.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = c.ClientSyncedState.AvailableComponents() + }, + ) + + c.sender.ScheduleSend() + } + + return nil +} diff --git a/client/internal/clientstate.go b/client/internal/clientstate.go index 93250c9f..fc25866b 100644 --- a/client/internal/clientstate.go +++ b/client/internal/clientstate.go @@ -14,11 +14,13 @@ var ( errPackageStatusesMissing = errors.New("PackageStatuses is not set") errServerProvidedAllPackagesHashNil = errors.New("ServerProvidedAllPackagesHash is nil") errCustomCapabilitiesMissing = errors.New("CustomCapabilities is not set") + errAvailableComponentsMissing = errors.New("AvailableComponents is not set") + errNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) // ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to -// have access to synchronize to the Server. Six messages can be stored in this store: -// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags. +// have access to synchronize to the Server. Seven messages can be stored in this store: +// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities, AvailableComponents and Flags. // // See OpAMP spec for more details on how status reporting works: // https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting @@ -34,12 +36,13 @@ var ( type ClientSyncedState struct { mutex sync.Mutex - agentDescription *protobufs.AgentDescription - health *protobufs.ComponentHealth - remoteConfigStatus *protobufs.RemoteConfigStatus - packageStatuses *protobufs.PackageStatuses - customCapabilities *protobufs.CustomCapabilities - flags protobufs.AgentToServerFlags + agentDescription *protobufs.AgentDescription + health *protobufs.ComponentHealth + remoteConfigStatus *protobufs.RemoteConfigStatus + packageStatuses *protobufs.PackageStatuses + customCapabilities *protobufs.CustomCapabilities + availableComponents *protobufs.AvailableComponents + flags protobufs.AgentToServerFlags } func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription { @@ -72,6 +75,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities { return s.customCapabilities } +func (s *ClientSyncedState) AvailableComponents() *protobufs.AvailableComponents { + defer s.mutex.Unlock() + s.mutex.Lock() + return s.availableComponents +} + func (s *ClientSyncedState) Flags() uint64 { defer s.mutex.Unlock() s.mutex.Lock() @@ -176,6 +185,20 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool { return false } +func (s *ClientSyncedState) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if components == nil { + return errAvailableComponentsMissing + } + + clone := proto.Clone(components).(*protobufs.AvailableComponents) + + defer s.mutex.Unlock() + s.mutex.Lock() + s.availableComponents = clone + + return nil +} + // SetFlags sets the flags in the state. func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) { defer s.mutex.Unlock() diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index aee05a81..55f2ee55 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -197,6 +197,7 @@ func (r *receivedProcessor) rcvFlags( msg.PackageStatuses = r.clientSyncedState.PackageStatuses() msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities() msg.Flags = r.clientSyncedState.Flags() + msg.AvailableComponents = r.clientSyncedState.AvailableComponents() // The logic for EffectiveConfig is similar to the previous 6 sub-messages however // the EffectiveConfig is fetched using GetEffectiveConfig instead of @@ -207,6 +208,15 @@ func (r *receivedProcessor) rcvFlags( scheduleSend = true } + if flags&protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents != 0 { + r.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = r.clientSyncedState.AvailableComponents() + }, + ) + scheduleSend = true + } + return scheduleSend, nil } diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 48d5f832..e12cda58 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -45,6 +45,9 @@ type MessageData struct { // CustomMessage contains a custom message sent by the server. CustomMessage *protobufs.CustomMessage + + // Flags contains any flags sent by the server. + Flags *protobufs.AgentToServerFlags } // Callbacks contains functions that are executed when the client encounters diff --git a/client/wsclient.go b/client/wsclient.go index f19d8ab4..fdacdd4b 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -157,6 +157,11 @@ func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageS return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func viaReq(resps []*http.Response) []*http.Request { reqs := make([]*http.Request, 0, len(resps)) for _, resp := range resps { diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 436ceb55..62be0b43 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -257,7 +257,7 @@ func TestVerifyWSCompress(t *testing.T) { remoteCfg := &protobufs.AgentRemoteConfig{ Config: &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{ - "": &protobufs.AgentConfigFile{ + "": { Body: uncompressedCfg, }, }, From 843072b16d72ffeec76661e154c8e7193e346440 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Fri, 10 Jan 2025 12:25:10 -0500 Subject: [PATCH 02/13] Remove pointer from Flags field --- client/types/callbacks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/types/callbacks.go b/client/types/callbacks.go index e12cda58..99f81ee4 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -47,7 +47,7 @@ type MessageData struct { CustomMessage *protobufs.CustomMessage // Flags contains any flags sent by the server. - Flags *protobufs.AgentToServerFlags + Flags protobufs.AgentToServerFlags } // Callbacks contains functions that are executed when the client encounters From 26a7996854641ab4f4eda62d8d7912d1b1b360c8 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Fri, 10 Jan 2025 16:37:15 -0500 Subject: [PATCH 03/13] Add unit tests, fix issues found by tests, document --- client/client.go | 7 +- client/clientimpl_test.go | 20 ++++++ client/httpclient_test.go | 110 ++++++++++++++++++++++++++++++++ client/internal/clientcommon.go | 17 +++-- client/types/startsettings.go | 5 ++ client/wsclient_test.go | 93 +++++++++++++++++++++++++++ 6 files changed, 247 insertions(+), 5 deletions(-) diff --git a/client/client.go b/client/client.go index 5917bd3a..78f4189b 100644 --- a/client/client.go +++ b/client/client.go @@ -137,6 +137,11 @@ type OpAMPClient interface { // SetAvailableComponents modifies the set of components that are available for configuration // on the agent. - // TODO more documentation + // May be called any time after Start(), including from the OnMessage handler. + // The new components will be sent with the next message to the server. + // If components is nil, errReportsAvailableComponentsNotSet will be returned. + // If components.Hash is nil or an empty []byte, errNoAvailableComponentHash will be returned. + // This method is subject to agent status compression - if components is not + // different from the cached agent state, this method is a no-op. SetAvailableComponents(components *protobufs.AvailableComponents) error } diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index dea6bd6c..2bab9b5b 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2305,3 +2305,23 @@ func TestSetFlagsBeforeStart(t *testing.T) { assert.NoError(t, err) }) } + +func generateTestAvailableComponents() *protobufs.AvailableComponents { + return &protobufs.AvailableComponents{ + Hash: []byte("fake-hash"), + Components: map[string]*protobufs.ComponentDetails{ + "receivers": { + Metadata: []*protobufs.KeyValue{ + { + Key: "component", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{ + StringValue: "filereceiver", + }, + }, + }, + }, + }, + }, + } +} diff --git a/client/httpclient_test.go b/client/httpclient_test.go index fc670411..9c16ac30 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opamp-go/client/internal" @@ -311,3 +312,112 @@ func TestRedirectHTTP(t *testing.T) { }) } } + +func TestHTTPReportsAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + availableComponents *protobufs.AvailableComponents + }{ + { + desc: "Does not report AvailableComponents", + availableComponents: nil, + }, + { + desc: "Reports AvailableComponents", + availableComponents: generateTestAvailableComponents(), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + // Start a Server. + srv := internal.StartMockServer(t) + var rcvCounter atomic.Uint64 + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum) + rcvCounter.Add(1) + time.Sleep(50 * time.Millisecond) + if rcvCounter.Load() == 1 { + resp := &protobufs.ServerToAgent{ + InstanceUid: msg.InstanceUid, + } + + if tc.availableComponents != nil { + // the first message received should contain just the available component hash + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Nil(t, availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + + // add the flag asking for the full available component state to the response + resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return resp + } + + if rcvCounter.Load() == 2 { + if tc.availableComponents != nil { + // the second message received should contain the full component state + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return nil + } + + // all subsequent messages should not have any available components + require.Nil(t, msg.GetAvailableComponents()) + return nil + } + + // Start a client. + settings := types.StartSettings{} + settings.OpAMPServerURL = "http://" + srv.Endpoint + if tc.availableComponents != nil { + settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents + settings.AvailableComponents = tc.availableComponents + } + + client := NewHTTP(nil) + prepareClient(t, &settings, client) + + assert.NoError(t, client.Start(context.Background(), settings)) + + // Verify that status report is delivered. + eventually(t, func() bool { + return rcvCounter.Load() == 1 + }) + + if tc.availableComponents != nil { + // Verify that status report is delivered again. Polling should ensure this. + eventually(t, func() bool { + return rcvCounter.Load() == 2 + }) + } else { + // Verify that no second status report is delivered (polling is too infrequent for this to happen in 3 seconds) + require.Never(t, func() bool { + return rcvCounter.Load() == 2 + }, 3*time.Second, 10*time.Millisecond) + } + + // Verify that no third status report is delivered (polling is too infrequent for this to happen in 3 seconds) + require.Never(t, func() bool { + return rcvCounter.Load() == 3 + }, 3*time.Second, 10*time.Millisecond) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + } +} diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 47ed75ce..5a7ea4cd 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -90,8 +90,14 @@ func (c *ClientCommon) PrepareStart( return ErrHealthMissing } - if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil { - return ErrAvailableComponentsMissing + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + if settings.AvailableComponents == nil { + return ErrAvailableComponentsMissing + } + + if err := c.ClientSyncedState.SetAvailableComponents(settings.AvailableComponents); err != nil { + return err + } } // Prepare remote config status. @@ -220,8 +226,11 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { // initially, do not send the full component state - just send the hash. // full state is available on request from the server using the corresponding ServerToAgent flag - availableComponents := &protobufs.AvailableComponents{ - Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + var availableComponents *protobufs.AvailableComponents + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + availableComponents = &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } } c.sender.NextMessage().Update( diff --git a/client/types/startsettings.go b/client/types/startsettings.go index 6184d575..4463e5bb 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -64,4 +64,9 @@ type StartSettings struct { // // If the ReportsHeartbeat capability is disabled, this option has no effect. HeartbeatInterval *time.Duration + + // Defines the available components of the Agent. + // Required if the ReportsAvailableComponents capability is set. + // If the ReportsAvailableComponents capability is not set, this option has no effect. + AvailableComponents *protobufs.AvailableComponents } diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 62be0b43..8abf7d04 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -727,3 +727,96 @@ func TestHandlesConnectionError(t *testing.T) { err = client.Stop(context.Background()) require.NoError(t, err) } + +func TestWSSenderReportsAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + availableComponents *protobufs.AvailableComponents + }{ + { + desc: "Does not report AvailableComponents", + availableComponents: nil, + }, + { + desc: "Reports AvailableComponents", + availableComponents: generateTestAvailableComponents(), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + srv := internal.StartMockServer(t) + + var firstMsg atomic.Bool + var conn atomic.Value + srv.OnWSConnect = func(c *websocket.Conn) { + conn.Store(c) + firstMsg.Store(true) + } + var msgCount atomic.Int64 + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + if firstMsg.Load() { + msgCount.Add(1) + firstMsg.Store(false) + resp := &protobufs.ServerToAgent{ + InstanceUid: msg.InstanceUid, + } + + if tc.availableComponents != nil { + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Nil(t, availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + + resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return resp + } + msgCount.Add(1) + if tc.availableComponents != nil { + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents()) + } else { + require.Error(t, errors.New("should not receive a second message when ReportsAvailableComponents is disabled")) + } + + return nil + } + + // Start an OpAMP/WebSocket client. + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + } + client := NewWebSocket(nil) + + if tc.availableComponents != nil { + settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents + settings.AvailableComponents = tc.availableComponents + } + + startClient(t, settings, client) + + // Wait for connection to be established. + eventually(t, func() bool { return conn.Load() != nil }) + + if tc.availableComponents != nil { + assert.Eventually(t, func() bool { + return msgCount.Load() >= 2 + }, 5*time.Second, 10*time.Millisecond) + } else { + assert.Never(t, func() bool { + return msgCount.Load() >= 2 + }, 3*time.Second, 10*time.Millisecond) + } + + // Stop the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + } +} From b189b213d26b32bcd98d2b8923f1773d3f2fbaf9 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 13 Jan 2025 10:11:56 -0500 Subject: [PATCH 04/13] Adjust SetAvailableComponents and add tests --- client/clientimpl_test.go | 103 ++++++++++++++++++++++++++++++++ client/internal/clientcommon.go | 23 ++++--- client/internal/clientstate.go | 1 - client/types/errors.go | 9 +++ 4 files changed, 128 insertions(+), 8 deletions(-) diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 2bab9b5b..bf2ce7b7 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2306,6 +2306,109 @@ func TestSetFlagsBeforeStart(t *testing.T) { }) } +func TestSetAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + capabilities protobufs.AgentCapabilities + testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer) + }{ + { + desc: "apply nil AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing) + }, + }, + { + desc: "apply AvailableComponents with empty hash", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash) + }, + }, + { + desc: "apply AvailableComponents without required capability", + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet) + }, + }, + { + desc: "apply AvailableComponents with cached AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents())) + }, + }, + { + desc: "apply AvailableComponents with new AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) { + availableComponents := generateTestAvailableComponents() + availableComponents.Hash = []byte("different") + require.NoError(t, client.SetAvailableComponents(availableComponents)) + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, 1, msg.SequenceNum) + msgAvailableComponents := msg.GetAvailableComponents() + require.NotNil(t, msgAvailableComponents) + require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) + require.Nil(t, msgAvailableComponents.GetComponents()) + return nil + }) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + + // Start a Server. + srv := internal.StartMockServer(t) + srv.EnableExpectMode() + + availableComponents := generateTestAvailableComponents() + + // Start a client. + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) {}, + }, + Capabilities: tc.capabilities, + AvailableComponents: availableComponents, + } + prepareClient(t, &settings, client) + + // Client ---> + assert.NoError(t, client.Start(context.Background(), settings)) + + // ---> Server + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, 0, msg.SequenceNum) + msgAvailableComponents := msg.GetAvailableComponents() + if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + require.NotNil(t, msgAvailableComponents) + require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) + require.Nil(t, msgAvailableComponents.GetComponents()) + } else { + require.Nil(t, msgAvailableComponents) + } + return nil + }) + + tc.testFunc(t, client, srv) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + }) + } +} + func generateTestAvailableComponents() *protobufs.AvailableComponents { return &protobufs.AvailableComponents{ Hash: []byte("fake-hash"), diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 5a7ea4cd..2a2e9140 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -22,10 +22,9 @@ var ( ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set") ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") - errAlreadyStarted = errors.New("already started") - errCannotStopNotStarted = errors.New("cannot stop because not started") - errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") - errReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") + errAlreadyStarted = errors.New("already started") + errCannotStopNotStarted = errors.New("cannot stop because not started") + errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") ) // ClientCommon contains the OpAMP logic that is common between WebSocket and @@ -459,11 +458,15 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess // SetAvailableComponents sends a message to the server with the available components for the agent func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { - return errReportsAvailableComponentsNotSet + return types.ErrReportsAvailableComponentsNotSet + } + + if components == nil { + return types.ErrAvailableComponentsMissing } if len(components.Hash) == 0 { - return errNoAvailableComponentHash + return types.ErrNoAvailableComponentHash } // implement agent status compression, don't send the message if it hasn't changed from the previous message @@ -474,9 +477,15 @@ func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableCom return err } + // initially, do not send the full component state - just send the hash. + // full state is available on request from the server using the corresponding ServerToAgent flag + availableComponents := &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } + c.sender.NextMessage().Update( func(msg *protobufs.AgentToServer) { - msg.AvailableComponents = c.ClientSyncedState.AvailableComponents() + msg.AvailableComponents = availableComponents }, ) diff --git a/client/internal/clientstate.go b/client/internal/clientstate.go index fc25866b..ae3da41f 100644 --- a/client/internal/clientstate.go +++ b/client/internal/clientstate.go @@ -15,7 +15,6 @@ var ( errServerProvidedAllPackagesHashNil = errors.New("ServerProvidedAllPackagesHash is nil") errCustomCapabilitiesMissing = errors.New("CustomCapabilities is not set") errAvailableComponentsMissing = errors.New("AvailableComponents is not set") - errNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) // ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to diff --git a/client/types/errors.go b/client/types/errors.go index aab66924..a1d1389b 100644 --- a/client/types/errors.go +++ b/client/types/errors.go @@ -13,4 +13,13 @@ var ( // ErrCustomMessagePending is returned by SendCustomMessage when called before the previous // message has been sent. ErrCustomMessagePending = errors.New("custom message already set") + + // ErrReportsAvailableComponentsNotSet is returned by SetAvailableComponents without the ReportsAvailableComponents capability set + ErrReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") + + // ErrAvailableComponentsMissing is returned by SetAvailableComponents when called with a nil message + ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") + + // ErrNoAvailableComponentHash is returned by SetAvailableComponents when called with a message with an empty hash + ErrNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) From 97a27daa4e930ebde2d8130a909ac7a891c62459 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 13 Jan 2025 11:56:34 -0500 Subject: [PATCH 05/13] Update documentation and reduce testing wait interval --- client/client.go | 8 ++++++-- client/httpclient_test.go | 12 ++++++------ client/wsclient_test.go | 4 ++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/client/client.go b/client/client.go index 78f4189b..c3d2ca3f 100644 --- a/client/client.go +++ b/client/client.go @@ -139,8 +139,12 @@ type OpAMPClient interface { // on the agent. // May be called any time after Start(), including from the OnMessage handler. // The new components will be sent with the next message to the server. - // If components is nil, errReportsAvailableComponentsNotSet will be returned. - // If components.Hash is nil or an empty []byte, errNoAvailableComponentHash will be returned. + // + // If components is nil, types.ErrAvailableComponentsMissing will be returned. + // If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned. + // If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(), + // types.ErrReportsAvailableComponentsNotSet will be returned. + // // This method is subject to agent status compression - if components is not // different from the cached agent state, this method is a no-op. SetAvailableComponents(components *protobufs.AvailableComponents) error diff --git a/client/httpclient_test.go b/client/httpclient_test.go index 9c16ac30..4bcc60fd 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -401,16 +401,16 @@ func TestHTTPReportsAvailableComponents(t *testing.T) { return rcvCounter.Load() == 2 }) } else { - // Verify that no second status report is delivered (polling is too infrequent for this to happen in 3 seconds) - require.Never(t, func() bool { + // Verify that no second status report is delivered (polling is too infrequent for this to happen in 50ms) + assert.Never(t, func() bool { return rcvCounter.Load() == 2 - }, 3*time.Second, 10*time.Millisecond) + }, 50*time.Millisecond, 10*time.Millisecond) } - // Verify that no third status report is delivered (polling is too infrequent for this to happen in 3 seconds) - require.Never(t, func() bool { + // Verify that no third status report is delivered (polling is too infrequent for this to happen in 50ms) + assert.Never(t, func() bool { return rcvCounter.Load() == 3 - }, 3*time.Second, 10*time.Millisecond) + }, 50*time.Millisecond, 10*time.Millisecond) // Shutdown the Server. srv.Close() diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 8abf7d04..d3540620 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -86,7 +86,7 @@ func TestWSSenderReportsHeartbeat(t *testing.T) { } else { assert.Never(t, func() bool { return msgCount.Load() >= 2 - }, 3*time.Second, 10*time.Millisecond) + }, 50*time.Millisecond, 10*time.Millisecond) } // Stop the client. @@ -811,7 +811,7 @@ func TestWSSenderReportsAvailableComponents(t *testing.T) { } else { assert.Never(t, func() bool { return msgCount.Load() >= 2 - }, 3*time.Second, 10*time.Millisecond) + }, 50*time.Millisecond, 10*time.Millisecond) } // Stop the client. From cd6ccd7b0eeae3f328e04d93fc59517637e0cff1 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 13 Jan 2025 15:52:40 -0500 Subject: [PATCH 06/13] Fix Flags field typing --- client/types/callbacks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 99f81ee4..0b1a9a23 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -47,7 +47,7 @@ type MessageData struct { CustomMessage *protobufs.CustomMessage // Flags contains any flags sent by the server. - Flags protobufs.AgentToServerFlags + Flags protobufs.ServerToAgentFlags } // Callbacks contains functions that are executed when the client encounters From 9a8dabd36baef1b5ce67e0e05252dbc2df5d1f84 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Tue, 14 Jan 2025 11:11:01 -0500 Subject: [PATCH 07/13] Set flags on MessageData before OnMessage callback --- client/internal/receivedprocessor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 55f2ee55..7a5474c7 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -75,7 +75,9 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro r.logger.Errorf(ctx, "cannot processed received flags:%v", err) } - msgData := &types.MessageData{} + msgData := &types.MessageData{ + Flags: protobufs.ServerToAgentFlags(msg.Flags), + } if msg.RemoteConfig != nil { if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) { From 78934797c1b22460be2154a5a2c4eefe632c8dd8 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Tue, 14 Jan 2025 11:28:06 -0500 Subject: [PATCH 08/13] Improve code coverage --- client/httpclient_test.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/client/httpclient_test.go b/client/httpclient_test.go index 4bcc60fd..0ecbba96 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -316,7 +316,9 @@ func TestRedirectHTTP(t *testing.T) { func TestHTTPReportsAvailableComponents(t *testing.T) { testCases := []struct { desc string + capabilities protobufs.AgentCapabilities availableComponents *protobufs.AvailableComponents + startErr error }{ { desc: "Does not report AvailableComponents", @@ -324,8 +326,14 @@ func TestHTTPReportsAvailableComponents(t *testing.T) { }, { desc: "Reports AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, availableComponents: generateTestAvailableComponents(), }, + { + desc: "No AvailableComponents on Start() despite capability", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + startErr: internal.ErrAvailableComponentsMissing, + }, } for _, tc := range testCases { @@ -380,15 +388,19 @@ func TestHTTPReportsAvailableComponents(t *testing.T) { // Start a client. settings := types.StartSettings{} settings.OpAMPServerURL = "http://" + srv.Endpoint - if tc.availableComponents != nil { - settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents - settings.AvailableComponents = tc.availableComponents - } + settings.Capabilities = tc.capabilities + settings.AvailableComponents = tc.availableComponents client := NewHTTP(nil) prepareClient(t, &settings, client) - assert.NoError(t, client.Start(context.Background(), settings)) + startErr := client.Start(context.Background(), settings) + if tc.startErr == nil { + assert.NoError(t, startErr) + } else { + assert.ErrorIs(t, startErr, tc.startErr) + return + } // Verify that status report is delivered. eventually(t, func() bool { From 9e2acf3bce37f30f3ac12eeeeb105ff0be5dc98f Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Tue, 14 Jan 2025 16:42:15 -0500 Subject: [PATCH 09/13] Remove SetAvailableComponents method from interface --- client/client.go | 14 ---- client/clientimpl_test.go | 103 --------------------------- client/httpclient.go | 5 -- client/internal/clientcommon.go | 40 ----------- client/internal/receivedprocessor.go | 4 +- client/types/callbacks.go | 3 - client/types/errors.go | 9 --- client/wsclient.go | 5 -- 8 files changed, 1 insertion(+), 182 deletions(-) diff --git a/client/client.go b/client/client.go index c3d2ca3f..afc08315 100644 --- a/client/client.go +++ b/client/client.go @@ -134,18 +134,4 @@ type OpAMPClient interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) - - // SetAvailableComponents modifies the set of components that are available for configuration - // on the agent. - // May be called any time after Start(), including from the OnMessage handler. - // The new components will be sent with the next message to the server. - // - // If components is nil, types.ErrAvailableComponentsMissing will be returned. - // If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned. - // If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(), - // types.ErrReportsAvailableComponentsNotSet will be returned. - // - // This method is subject to agent status compression - if components is not - // different from the cached agent state, this method is a no-op. - SetAvailableComponents(components *protobufs.AvailableComponents) error } diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index bf2ce7b7..2bab9b5b 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2306,109 +2306,6 @@ func TestSetFlagsBeforeStart(t *testing.T) { }) } -func TestSetAvailableComponents(t *testing.T) { - testCases := []struct { - desc string - capabilities protobufs.AgentCapabilities - testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer) - }{ - { - desc: "apply nil AvailableComponents", - capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, - testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { - require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing) - }, - }, - { - desc: "apply AvailableComponents with empty hash", - capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, - testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { - require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash) - }, - }, - { - desc: "apply AvailableComponents without required capability", - testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { - require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet) - }, - }, - { - desc: "apply AvailableComponents with cached AvailableComponents", - capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, - testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { - require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents())) - }, - }, - { - desc: "apply AvailableComponents with new AvailableComponents", - capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, - testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) { - availableComponents := generateTestAvailableComponents() - availableComponents.Hash = []byte("different") - require.NoError(t, client.SetAvailableComponents(availableComponents)) - srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { - assert.EqualValues(t, 1, msg.SequenceNum) - msgAvailableComponents := msg.GetAvailableComponents() - require.NotNil(t, msgAvailableComponents) - require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) - require.Nil(t, msgAvailableComponents.GetComponents()) - return nil - }) - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - testClients(t, func(t *testing.T, client OpAMPClient) { - - // Start a Server. - srv := internal.StartMockServer(t) - srv.EnableExpectMode() - - availableComponents := generateTestAvailableComponents() - - // Start a client. - settings := types.StartSettings{ - OpAMPServerURL: "ws://" + srv.Endpoint, - Callbacks: types.Callbacks{ - OnMessage: func(ctx context.Context, msg *types.MessageData) {}, - }, - Capabilities: tc.capabilities, - AvailableComponents: availableComponents, - } - prepareClient(t, &settings, client) - - // Client ---> - assert.NoError(t, client.Start(context.Background(), settings)) - - // ---> Server - srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { - assert.EqualValues(t, 0, msg.SequenceNum) - msgAvailableComponents := msg.GetAvailableComponents() - if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { - require.NotNil(t, msgAvailableComponents) - require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) - require.Nil(t, msgAvailableComponents.GetComponents()) - } else { - require.Nil(t, msgAvailableComponents) - } - return nil - }) - - tc.testFunc(t, client, srv) - - // Shutdown the Server. - srv.Close() - - // Shutdown the client. - err := client.Stop(context.Background()) - assert.NoError(t, err) - }) - }) - } -} - func generateTestAvailableComponents() *protobufs.AvailableComponents { return &protobufs.AvailableComponents{ Hash: []byte("fake-hash"), diff --git a/client/httpclient.go b/client/httpclient.go index 8d32797d..92259d59 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -120,11 +120,6 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag return c.common.SendCustomMessage(message) } -// SetAvailableComponents implements OpAMPClient.SetAvailableComponents -func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { - return c.common.SetAvailableComponents(components) -} - func (c *httpClient) runUntilStopped(ctx context.Context) { // Start the HTTP sender. This will make request/responses with retries for // failures and will wait with configured polling interval if there is nothing diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 2a2e9140..1fd2b969 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -454,43 +454,3 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess return sendingChan, nil } - -// SetAvailableComponents sends a message to the server with the available components for the agent -func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { - if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { - return types.ErrReportsAvailableComponentsNotSet - } - - if components == nil { - return types.ErrAvailableComponentsMissing - } - - if len(components.Hash) == 0 { - return types.ErrNoAvailableComponentHash - } - - // implement agent status compression, don't send the message if it hasn't changed from the previous message - availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components) - - if availableComponentsChanged { - if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil { - return err - } - - // initially, do not send the full component state - just send the hash. - // full state is available on request from the server using the corresponding ServerToAgent flag - availableComponents := &protobufs.AvailableComponents{ - Hash: c.ClientSyncedState.AvailableComponents().GetHash(), - } - - c.sender.NextMessage().Update( - func(msg *protobufs.AgentToServer) { - msg.AvailableComponents = availableComponents - }, - ) - - c.sender.ScheduleSend() - } - - return nil -} diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 7a5474c7..55f2ee55 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -75,9 +75,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro r.logger.Errorf(ctx, "cannot processed received flags:%v", err) } - msgData := &types.MessageData{ - Flags: protobufs.ServerToAgentFlags(msg.Flags), - } + msgData := &types.MessageData{} if msg.RemoteConfig != nil { if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) { diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 0b1a9a23..48d5f832 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -45,9 +45,6 @@ type MessageData struct { // CustomMessage contains a custom message sent by the server. CustomMessage *protobufs.CustomMessage - - // Flags contains any flags sent by the server. - Flags protobufs.ServerToAgentFlags } // Callbacks contains functions that are executed when the client encounters diff --git a/client/types/errors.go b/client/types/errors.go index a1d1389b..aab66924 100644 --- a/client/types/errors.go +++ b/client/types/errors.go @@ -13,13 +13,4 @@ var ( // ErrCustomMessagePending is returned by SendCustomMessage when called before the previous // message has been sent. ErrCustomMessagePending = errors.New("custom message already set") - - // ErrReportsAvailableComponentsNotSet is returned by SetAvailableComponents without the ReportsAvailableComponents capability set - ErrReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") - - // ErrAvailableComponentsMissing is returned by SetAvailableComponents when called with a nil message - ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") - - // ErrNoAvailableComponentHash is returned by SetAvailableComponents when called with a message with an empty hash - ErrNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) diff --git a/client/wsclient.go b/client/wsclient.go index fdacdd4b..f19d8ab4 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -157,11 +157,6 @@ func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageS return c.common.SendCustomMessage(message) } -// SetAvailableComponents implements OpAMPClient.SetAvailableComponents -func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { - return c.common.SetAvailableComponents(components) -} - func viaReq(resps []*http.Response) []*http.Request { reqs := make([]*http.Request, 0, len(resps)) for _, resp := range resps { From be86a6ff35e9cd9b2ec76b936c5e935059e49f4d Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Wed, 22 Jan 2025 09:42:28 -0500 Subject: [PATCH 10/13] Revert "Remove SetAvailableComponents method from interface" This reverts commit 9e2acf3bce37f30f3ac12eeeeb105ff0be5dc98f. --- client/client.go | 14 ++++ client/clientimpl_test.go | 103 +++++++++++++++++++++++++++ client/httpclient.go | 5 ++ client/internal/clientcommon.go | 40 +++++++++++ client/internal/receivedprocessor.go | 4 +- client/types/callbacks.go | 3 + client/types/errors.go | 9 +++ client/wsclient.go | 5 ++ 8 files changed, 182 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index afc08315..c3d2ca3f 100644 --- a/client/client.go +++ b/client/client.go @@ -134,4 +134,18 @@ type OpAMPClient interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) + + // SetAvailableComponents modifies the set of components that are available for configuration + // on the agent. + // May be called any time after Start(), including from the OnMessage handler. + // The new components will be sent with the next message to the server. + // + // If components is nil, types.ErrAvailableComponentsMissing will be returned. + // If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned. + // If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(), + // types.ErrReportsAvailableComponentsNotSet will be returned. + // + // This method is subject to agent status compression - if components is not + // different from the cached agent state, this method is a no-op. + SetAvailableComponents(components *protobufs.AvailableComponents) error } diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 2bab9b5b..bf2ce7b7 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2306,6 +2306,109 @@ func TestSetFlagsBeforeStart(t *testing.T) { }) } +func TestSetAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + capabilities protobufs.AgentCapabilities + testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer) + }{ + { + desc: "apply nil AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing) + }, + }, + { + desc: "apply AvailableComponents with empty hash", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash) + }, + }, + { + desc: "apply AvailableComponents without required capability", + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet) + }, + }, + { + desc: "apply AvailableComponents with cached AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents())) + }, + }, + { + desc: "apply AvailableComponents with new AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) { + availableComponents := generateTestAvailableComponents() + availableComponents.Hash = []byte("different") + require.NoError(t, client.SetAvailableComponents(availableComponents)) + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, 1, msg.SequenceNum) + msgAvailableComponents := msg.GetAvailableComponents() + require.NotNil(t, msgAvailableComponents) + require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) + require.Nil(t, msgAvailableComponents.GetComponents()) + return nil + }) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + + // Start a Server. + srv := internal.StartMockServer(t) + srv.EnableExpectMode() + + availableComponents := generateTestAvailableComponents() + + // Start a client. + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) {}, + }, + Capabilities: tc.capabilities, + AvailableComponents: availableComponents, + } + prepareClient(t, &settings, client) + + // Client ---> + assert.NoError(t, client.Start(context.Background(), settings)) + + // ---> Server + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, 0, msg.SequenceNum) + msgAvailableComponents := msg.GetAvailableComponents() + if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + require.NotNil(t, msgAvailableComponents) + require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) + require.Nil(t, msgAvailableComponents.GetComponents()) + } else { + require.Nil(t, msgAvailableComponents) + } + return nil + }) + + tc.testFunc(t, client, srv) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + }) + } +} + func generateTestAvailableComponents() *protobufs.AvailableComponents { return &protobufs.AvailableComponents{ Hash: []byte("fake-hash"), diff --git a/client/httpclient.go b/client/httpclient.go index 92259d59..8d32797d 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func (c *httpClient) runUntilStopped(ctx context.Context) { // Start the HTTP sender. This will make request/responses with retries for // failures and will wait with configured polling interval if there is nothing diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 1fd2b969..2a2e9140 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -454,3 +454,43 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess return sendingChan, nil } + +// SetAvailableComponents sends a message to the server with the available components for the agent +func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { + return types.ErrReportsAvailableComponentsNotSet + } + + if components == nil { + return types.ErrAvailableComponentsMissing + } + + if len(components.Hash) == 0 { + return types.ErrNoAvailableComponentHash + } + + // implement agent status compression, don't send the message if it hasn't changed from the previous message + availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components) + + if availableComponentsChanged { + if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil { + return err + } + + // initially, do not send the full component state - just send the hash. + // full state is available on request from the server using the corresponding ServerToAgent flag + availableComponents := &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } + + c.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = availableComponents + }, + ) + + c.sender.ScheduleSend() + } + + return nil +} diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 55f2ee55..7a5474c7 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -75,7 +75,9 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro r.logger.Errorf(ctx, "cannot processed received flags:%v", err) } - msgData := &types.MessageData{} + msgData := &types.MessageData{ + Flags: protobufs.ServerToAgentFlags(msg.Flags), + } if msg.RemoteConfig != nil { if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) { diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 48d5f832..0b1a9a23 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -45,6 +45,9 @@ type MessageData struct { // CustomMessage contains a custom message sent by the server. CustomMessage *protobufs.CustomMessage + + // Flags contains any flags sent by the server. + Flags protobufs.ServerToAgentFlags } // Callbacks contains functions that are executed when the client encounters diff --git a/client/types/errors.go b/client/types/errors.go index aab66924..a1d1389b 100644 --- a/client/types/errors.go +++ b/client/types/errors.go @@ -13,4 +13,13 @@ var ( // ErrCustomMessagePending is returned by SendCustomMessage when called before the previous // message has been sent. ErrCustomMessagePending = errors.New("custom message already set") + + // ErrReportsAvailableComponentsNotSet is returned by SetAvailableComponents without the ReportsAvailableComponents capability set + ErrReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") + + // ErrAvailableComponentsMissing is returned by SetAvailableComponents when called with a nil message + ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") + + // ErrNoAvailableComponentHash is returned by SetAvailableComponents when called with a message with an empty hash + ErrNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) diff --git a/client/wsclient.go b/client/wsclient.go index f19d8ab4..fdacdd4b 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -157,6 +157,11 @@ func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageS return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func viaReq(resps []*http.Response) []*http.Request { reqs := make([]*http.Request, 0, len(resps)) for _, resp := range resps { From 53d8cd067ba7639bd4e9c644fd3483951edb0711 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Wed, 22 Jan 2025 10:01:24 -0500 Subject: [PATCH 11/13] Initialize AvailableComponents via SetAvailableComponents rather than StartSettings --- client/client.go | 4 ++++ client/clientimpl_test.go | 4 ++-- client/httpclient_test.go | 2 +- client/internal/clientcommon.go | 14 ++++++-------- client/internal/receivedprocessor.go | 4 +--- client/types/callbacks.go | 3 --- client/types/startsettings.go | 5 ----- client/wsclient_test.go | 2 +- 8 files changed, 15 insertions(+), 23 deletions(-) diff --git a/client/client.go b/client/client.go index c3d2ca3f..b10e31a6 100644 --- a/client/client.go +++ b/client/client.go @@ -137,9 +137,13 @@ type OpAMPClient interface { // SetAvailableComponents modifies the set of components that are available for configuration // on the agent. + // If called before Start(), initializes the client state that will be sent to the server upon Start(). + // Must be called before Start() if the ReportsAvailableComponents capability is set. + // // May be called any time after Start(), including from the OnMessage handler. // The new components will be sent with the next message to the server. // + // When called after Start(): // If components is nil, types.ErrAvailableComponentsMissing will be returned. // If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned. // If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(), diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index bf2ce7b7..87d3e5e8 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2367,6 +2367,7 @@ func TestSetAvailableComponents(t *testing.T) { srv.EnableExpectMode() availableComponents := generateTestAvailableComponents() + client.SetAvailableComponents(availableComponents) // Start a client. settings := types.StartSettings{ @@ -2374,8 +2375,7 @@ func TestSetAvailableComponents(t *testing.T) { Callbacks: types.Callbacks{ OnMessage: func(ctx context.Context, msg *types.MessageData) {}, }, - Capabilities: tc.capabilities, - AvailableComponents: availableComponents, + Capabilities: tc.capabilities, } prepareClient(t, &settings, client) diff --git a/client/httpclient_test.go b/client/httpclient_test.go index 0ecbba96..ba083486 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -389,9 +389,9 @@ func TestHTTPReportsAvailableComponents(t *testing.T) { settings := types.StartSettings{} settings.OpAMPServerURL = "http://" + srv.Endpoint settings.Capabilities = tc.capabilities - settings.AvailableComponents = tc.availableComponents client := NewHTTP(nil) + client.SetAvailableComponents(tc.availableComponents) prepareClient(t, &settings, client) startErr := client.Start(context.Background(), settings) diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 2a2e9140..8035aef9 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -89,14 +89,8 @@ func (c *ClientCommon) PrepareStart( return ErrHealthMissing } - if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { - if settings.AvailableComponents == nil { - return ErrAvailableComponentsMissing - } - - if err := c.ClientSyncedState.SetAvailableComponents(settings.AvailableComponents); err != nil { - return err - } + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil { + return ErrAvailableComponentsMissing } // Prepare remote config status. @@ -457,6 +451,10 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess // SetAvailableComponents sends a message to the server with the available components for the agent func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if !c.isStarted { + return c.ClientSyncedState.SetAvailableComponents(components) + } + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { return types.ErrReportsAvailableComponentsNotSet } diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 7a5474c7..55f2ee55 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -75,9 +75,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro r.logger.Errorf(ctx, "cannot processed received flags:%v", err) } - msgData := &types.MessageData{ - Flags: protobufs.ServerToAgentFlags(msg.Flags), - } + msgData := &types.MessageData{} if msg.RemoteConfig != nil { if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) { diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 0b1a9a23..48d5f832 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -45,9 +45,6 @@ type MessageData struct { // CustomMessage contains a custom message sent by the server. CustomMessage *protobufs.CustomMessage - - // Flags contains any flags sent by the server. - Flags protobufs.ServerToAgentFlags } // Callbacks contains functions that are executed when the client encounters diff --git a/client/types/startsettings.go b/client/types/startsettings.go index 4463e5bb..6184d575 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -64,9 +64,4 @@ type StartSettings struct { // // If the ReportsHeartbeat capability is disabled, this option has no effect. HeartbeatInterval *time.Duration - - // Defines the available components of the Agent. - // Required if the ReportsAvailableComponents capability is set. - // If the ReportsAvailableComponents capability is not set, this option has no effect. - AvailableComponents *protobufs.AvailableComponents } diff --git a/client/wsclient_test.go b/client/wsclient_test.go index d3540620..66b89e05 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -796,7 +796,7 @@ func TestWSSenderReportsAvailableComponents(t *testing.T) { if tc.availableComponents != nil { settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents - settings.AvailableComponents = tc.availableComponents + client.SetAvailableComponents(tc.availableComponents) } startClient(t, settings, client) From 52e6b800c58da65dd46cd3b2ae78f62c78336545 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 27 Jan 2025 09:50:15 -0500 Subject: [PATCH 12/13] Add clarifying documentation line for edge case --- client/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index b10e31a6..5e326cdd 100644 --- a/client/client.go +++ b/client/client.go @@ -137,7 +137,8 @@ type OpAMPClient interface { // SetAvailableComponents modifies the set of components that are available for configuration // on the agent. - // If called before Start(), initializes the client state that will be sent to the server upon Start(). + // If called before Start(), initializes the client state that will be sent to the server upon + // Start() if the if the ReportsAvailableComponents capability is set. // Must be called before Start() if the ReportsAvailableComponents capability is set. // // May be called any time after Start(), including from the OnMessage handler. From 6cb83a6b9c69e2cf20a7e38b9f099e3987071da8 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 27 Jan 2025 09:51:46 -0500 Subject: [PATCH 13/13] Fix typo --- client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 5e326cdd..f65e824e 100644 --- a/client/client.go +++ b/client/client.go @@ -138,7 +138,7 @@ type OpAMPClient interface { // SetAvailableComponents modifies the set of components that are available for configuration // on the agent. // If called before Start(), initializes the client state that will be sent to the server upon - // Start() if the if the ReportsAvailableComponents capability is set. + // Start() if the ReportsAvailableComponents capability is set. // Must be called before Start() if the ReportsAvailableComponents capability is set. // // May be called any time after Start(), including from the OnMessage handler.