diff --git a/client/client.go b/client/client.go index afc08315..f65e824e 100644 --- a/client/client.go +++ b/client/client.go @@ -134,4 +134,23 @@ type OpAMPClient interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) + + // SetAvailableComponents modifies the set of components that are available for configuration + // on the agent. + // If called before Start(), initializes the client state that will be sent to the server upon + // Start() if the ReportsAvailableComponents capability is set. + // Must be called before Start() if the ReportsAvailableComponents capability is set. + // + // May be called any time after Start(), including from the OnMessage handler. + // The new components will be sent with the next message to the server. + // + // When called after Start(): + // If components is nil, types.ErrAvailableComponentsMissing will be returned. + // If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned. + // If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(), + // types.ErrReportsAvailableComponentsNotSet will be returned. + // + // This method is subject to agent status compression - if components is not + // different from the cached agent state, this method is a no-op. + SetAvailableComponents(components *protobufs.AvailableComponents) error } diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index dea6bd6c..87d3e5e8 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2305,3 +2305,126 @@ func TestSetFlagsBeforeStart(t *testing.T) { assert.NoError(t, err) }) } + +func TestSetAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + capabilities protobufs.AgentCapabilities + testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer) + }{ + { + desc: "apply nil AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing) + }, + }, + { + desc: "apply AvailableComponents with empty hash", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash) + }, + }, + { + desc: "apply AvailableComponents without required capability", + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet) + }, + }, + { + desc: "apply AvailableComponents with cached AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) { + require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents())) + }, + }, + { + desc: "apply AvailableComponents with new AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) { + availableComponents := generateTestAvailableComponents() + availableComponents.Hash = []byte("different") + require.NoError(t, client.SetAvailableComponents(availableComponents)) + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, 1, msg.SequenceNum) + msgAvailableComponents := msg.GetAvailableComponents() + require.NotNil(t, msgAvailableComponents) + require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) + require.Nil(t, msgAvailableComponents.GetComponents()) + return nil + }) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + + // Start a Server. + srv := internal.StartMockServer(t) + srv.EnableExpectMode() + + availableComponents := generateTestAvailableComponents() + client.SetAvailableComponents(availableComponents) + + // Start a client. + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) {}, + }, + Capabilities: tc.capabilities, + } + prepareClient(t, &settings, client) + + // Client ---> + assert.NoError(t, client.Start(context.Background(), settings)) + + // ---> Server + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, 0, msg.SequenceNum) + msgAvailableComponents := msg.GetAvailableComponents() + if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + require.NotNil(t, msgAvailableComponents) + require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash()) + require.Nil(t, msgAvailableComponents.GetComponents()) + } else { + require.Nil(t, msgAvailableComponents) + } + return nil + }) + + tc.testFunc(t, client, srv) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + }) + } +} + +func generateTestAvailableComponents() *protobufs.AvailableComponents { + return &protobufs.AvailableComponents{ + Hash: []byte("fake-hash"), + Components: map[string]*protobufs.ComponentDetails{ + "receivers": { + Metadata: []*protobufs.KeyValue{ + { + Key: "component", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{ + StringValue: "filereceiver", + }, + }, + }, + }, + }, + }, + } +} diff --git a/client/httpclient.go b/client/httpclient.go index 92259d59..8d32797d 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func (c *httpClient) runUntilStopped(ctx context.Context) { // Start the HTTP sender. This will make request/responses with retries for // failures and will wait with configured polling interval if there is nothing diff --git a/client/httpclient_test.go b/client/httpclient_test.go index fc670411..ba083486 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opamp-go/client/internal" @@ -311,3 +312,124 @@ func TestRedirectHTTP(t *testing.T) { }) } } + +func TestHTTPReportsAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + capabilities protobufs.AgentCapabilities + availableComponents *protobufs.AvailableComponents + startErr error + }{ + { + desc: "Does not report AvailableComponents", + availableComponents: nil, + }, + { + desc: "Reports AvailableComponents", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + availableComponents: generateTestAvailableComponents(), + }, + { + desc: "No AvailableComponents on Start() despite capability", + capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents, + startErr: internal.ErrAvailableComponentsMissing, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + // Start a Server. + srv := internal.StartMockServer(t) + var rcvCounter atomic.Uint64 + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum) + rcvCounter.Add(1) + time.Sleep(50 * time.Millisecond) + if rcvCounter.Load() == 1 { + resp := &protobufs.ServerToAgent{ + InstanceUid: msg.InstanceUid, + } + + if tc.availableComponents != nil { + // the first message received should contain just the available component hash + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Nil(t, availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + + // add the flag asking for the full available component state to the response + resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return resp + } + + if rcvCounter.Load() == 2 { + if tc.availableComponents != nil { + // the second message received should contain the full component state + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return nil + } + + // all subsequent messages should not have any available components + require.Nil(t, msg.GetAvailableComponents()) + return nil + } + + // Start a client. + settings := types.StartSettings{} + settings.OpAMPServerURL = "http://" + srv.Endpoint + settings.Capabilities = tc.capabilities + + client := NewHTTP(nil) + client.SetAvailableComponents(tc.availableComponents) + prepareClient(t, &settings, client) + + startErr := client.Start(context.Background(), settings) + if tc.startErr == nil { + assert.NoError(t, startErr) + } else { + assert.ErrorIs(t, startErr, tc.startErr) + return + } + + // Verify that status report is delivered. + eventually(t, func() bool { + return rcvCounter.Load() == 1 + }) + + if tc.availableComponents != nil { + // Verify that status report is delivered again. Polling should ensure this. + eventually(t, func() bool { + return rcvCounter.Load() == 2 + }) + } else { + // Verify that no second status report is delivered (polling is too infrequent for this to happen in 50ms) + assert.Never(t, func() bool { + return rcvCounter.Load() == 2 + }, 50*time.Millisecond, 10*time.Millisecond) + } + + // Verify that no third status report is delivered (polling is too infrequent for this to happen in 50ms) + assert.Never(t, func() bool { + return rcvCounter.Load() == 3 + }, 50*time.Millisecond, 10*time.Millisecond) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + } +} diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 283e3b01..8035aef9 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -20,6 +20,7 @@ var ( ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set") ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set") ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set") + ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") errAlreadyStarted = errors.New("already started") errCannotStopNotStarted = errors.New("cannot stop because not started") @@ -88,6 +89,10 @@ func (c *ClientCommon) PrepareStart( return ErrHealthMissing } + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil { + return ErrAvailableComponentsMissing + } + // Prepare remote config status. if settings.RemoteConfigStatus == nil { // RemoteConfigStatus is not provided. Start with empty. @@ -212,6 +217,15 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { return err } + // initially, do not send the full component state - just send the hash. + // full state is available on request from the server using the corresponding ServerToAgent flag + var availableComponents *protobufs.AvailableComponents + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + availableComponents = &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } + } + c.sender.NextMessage().Update( func(msg *protobufs.AgentToServer) { msg.AgentDescription = c.ClientSyncedState.AgentDescription() @@ -221,6 +235,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { msg.Capabilities = uint64(c.Capabilities) msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities() msg.Flags = c.ClientSyncedState.Flags() + msg.AvailableComponents = availableComponents }, ) return nil @@ -433,3 +448,47 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess return sendingChan, nil } + +// SetAvailableComponents sends a message to the server with the available components for the agent +func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if !c.isStarted { + return c.ClientSyncedState.SetAvailableComponents(components) + } + + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { + return types.ErrReportsAvailableComponentsNotSet + } + + if components == nil { + return types.ErrAvailableComponentsMissing + } + + if len(components.Hash) == 0 { + return types.ErrNoAvailableComponentHash + } + + // implement agent status compression, don't send the message if it hasn't changed from the previous message + availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components) + + if availableComponentsChanged { + if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil { + return err + } + + // initially, do not send the full component state - just send the hash. + // full state is available on request from the server using the corresponding ServerToAgent flag + availableComponents := &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } + + c.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = availableComponents + }, + ) + + c.sender.ScheduleSend() + } + + return nil +} diff --git a/client/internal/clientstate.go b/client/internal/clientstate.go index 93250c9f..ae3da41f 100644 --- a/client/internal/clientstate.go +++ b/client/internal/clientstate.go @@ -14,11 +14,12 @@ var ( errPackageStatusesMissing = errors.New("PackageStatuses is not set") errServerProvidedAllPackagesHashNil = errors.New("ServerProvidedAllPackagesHash is nil") errCustomCapabilitiesMissing = errors.New("CustomCapabilities is not set") + errAvailableComponentsMissing = errors.New("AvailableComponents is not set") ) // ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to -// have access to synchronize to the Server. Six messages can be stored in this store: -// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags. +// have access to synchronize to the Server. Seven messages can be stored in this store: +// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities, AvailableComponents and Flags. // // See OpAMP spec for more details on how status reporting works: // https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting @@ -34,12 +35,13 @@ var ( type ClientSyncedState struct { mutex sync.Mutex - agentDescription *protobufs.AgentDescription - health *protobufs.ComponentHealth - remoteConfigStatus *protobufs.RemoteConfigStatus - packageStatuses *protobufs.PackageStatuses - customCapabilities *protobufs.CustomCapabilities - flags protobufs.AgentToServerFlags + agentDescription *protobufs.AgentDescription + health *protobufs.ComponentHealth + remoteConfigStatus *protobufs.RemoteConfigStatus + packageStatuses *protobufs.PackageStatuses + customCapabilities *protobufs.CustomCapabilities + availableComponents *protobufs.AvailableComponents + flags protobufs.AgentToServerFlags } func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription { @@ -72,6 +74,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities { return s.customCapabilities } +func (s *ClientSyncedState) AvailableComponents() *protobufs.AvailableComponents { + defer s.mutex.Unlock() + s.mutex.Lock() + return s.availableComponents +} + func (s *ClientSyncedState) Flags() uint64 { defer s.mutex.Unlock() s.mutex.Lock() @@ -176,6 +184,20 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool { return false } +func (s *ClientSyncedState) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if components == nil { + return errAvailableComponentsMissing + } + + clone := proto.Clone(components).(*protobufs.AvailableComponents) + + defer s.mutex.Unlock() + s.mutex.Lock() + s.availableComponents = clone + + return nil +} + // SetFlags sets the flags in the state. func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) { defer s.mutex.Unlock() diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index aee05a81..55f2ee55 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -197,6 +197,7 @@ func (r *receivedProcessor) rcvFlags( msg.PackageStatuses = r.clientSyncedState.PackageStatuses() msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities() msg.Flags = r.clientSyncedState.Flags() + msg.AvailableComponents = r.clientSyncedState.AvailableComponents() // The logic for EffectiveConfig is similar to the previous 6 sub-messages however // the EffectiveConfig is fetched using GetEffectiveConfig instead of @@ -207,6 +208,15 @@ func (r *receivedProcessor) rcvFlags( scheduleSend = true } + if flags&protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents != 0 { + r.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = r.clientSyncedState.AvailableComponents() + }, + ) + scheduleSend = true + } + return scheduleSend, nil } diff --git a/client/types/errors.go b/client/types/errors.go index aab66924..a1d1389b 100644 --- a/client/types/errors.go +++ b/client/types/errors.go @@ -13,4 +13,13 @@ var ( // ErrCustomMessagePending is returned by SendCustomMessage when called before the previous // message has been sent. ErrCustomMessagePending = errors.New("custom message already set") + + // ErrReportsAvailableComponentsNotSet is returned by SetAvailableComponents without the ReportsAvailableComponents capability set + ErrReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") + + // ErrAvailableComponentsMissing is returned by SetAvailableComponents when called with a nil message + ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") + + // ErrNoAvailableComponentHash is returned by SetAvailableComponents when called with a message with an empty hash + ErrNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) diff --git a/client/wsclient.go b/client/wsclient.go index f19d8ab4..fdacdd4b 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -157,6 +157,11 @@ func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageS return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func viaReq(resps []*http.Response) []*http.Request { reqs := make([]*http.Request, 0, len(resps)) for _, resp := range resps { diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 436ceb55..66b89e05 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -86,7 +86,7 @@ func TestWSSenderReportsHeartbeat(t *testing.T) { } else { assert.Never(t, func() bool { return msgCount.Load() >= 2 - }, 3*time.Second, 10*time.Millisecond) + }, 50*time.Millisecond, 10*time.Millisecond) } // Stop the client. @@ -257,7 +257,7 @@ func TestVerifyWSCompress(t *testing.T) { remoteCfg := &protobufs.AgentRemoteConfig{ Config: &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{ - "": &protobufs.AgentConfigFile{ + "": { Body: uncompressedCfg, }, }, @@ -727,3 +727,96 @@ func TestHandlesConnectionError(t *testing.T) { err = client.Stop(context.Background()) require.NoError(t, err) } + +func TestWSSenderReportsAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + availableComponents *protobufs.AvailableComponents + }{ + { + desc: "Does not report AvailableComponents", + availableComponents: nil, + }, + { + desc: "Reports AvailableComponents", + availableComponents: generateTestAvailableComponents(), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + srv := internal.StartMockServer(t) + + var firstMsg atomic.Bool + var conn atomic.Value + srv.OnWSConnect = func(c *websocket.Conn) { + conn.Store(c) + firstMsg.Store(true) + } + var msgCount atomic.Int64 + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + if firstMsg.Load() { + msgCount.Add(1) + firstMsg.Store(false) + resp := &protobufs.ServerToAgent{ + InstanceUid: msg.InstanceUid, + } + + if tc.availableComponents != nil { + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Nil(t, availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + + resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return resp + } + msgCount.Add(1) + if tc.availableComponents != nil { + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents()) + } else { + require.Error(t, errors.New("should not receive a second message when ReportsAvailableComponents is disabled")) + } + + return nil + } + + // Start an OpAMP/WebSocket client. + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + } + client := NewWebSocket(nil) + + if tc.availableComponents != nil { + settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents + client.SetAvailableComponents(tc.availableComponents) + } + + startClient(t, settings, client) + + // Wait for connection to be established. + eventually(t, func() bool { return conn.Load() != nil }) + + if tc.availableComponents != nil { + assert.Eventually(t, func() bool { + return msgCount.Load() >= 2 + }, 5*time.Second, 10*time.Millisecond) + } else { + assert.Never(t, func() bool { + return msgCount.Load() >= 2 + }, 50*time.Millisecond, 10*time.Millisecond) + } + + // Stop the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + } +}