Skip to content

Commit

Permalink
feat: Add implementation of new AvailableComponents message (#340)
Browse files Browse the repository at this point in the history
Implements the specification added to opamp-spec in open-telemetry/opamp-spec#201
  • Loading branch information
mrsillydog authored Jan 27, 2025
1 parent 88af57a commit 2ecac8d
Show file tree
Hide file tree
Showing 10 changed files with 477 additions and 10 deletions.
19 changes: 19 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,23 @@ type OpAMPClient interface {
// If no error is returned, the channel returned will be closed after the specified
// message is sent.
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)

// SetAvailableComponents modifies the set of components that are available for configuration
// on the agent.
// If called before Start(), initializes the client state that will be sent to the server upon
// Start() if the ReportsAvailableComponents capability is set.
// Must be called before Start() if the ReportsAvailableComponents capability is set.
//
// May be called any time after Start(), including from the OnMessage handler.
// The new components will be sent with the next message to the server.
//
// When called after Start():
// If components is nil, types.ErrAvailableComponentsMissing will be returned.
// If components.Hash is nil or an empty []byte, types.ErrNoAvailableComponentHash will be returned.
// If the ReportsAvailableComponents capability is not set in StartSettings.Capabilities during Start(),
// types.ErrReportsAvailableComponentsNotSet will be returned.
//
// This method is subject to agent status compression - if components is not
// different from the cached agent state, this method is a no-op.
SetAvailableComponents(components *protobufs.AvailableComponents) error
}
123 changes: 123 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2305,3 +2305,126 @@ func TestSetFlagsBeforeStart(t *testing.T) {
assert.NoError(t, err)
})
}

func TestSetAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
capabilities protobufs.AgentCapabilities
testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer)
}{
{
desc: "apply nil AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing)
},
},
{
desc: "apply AvailableComponents with empty hash",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash)
},
},
{
desc: "apply AvailableComponents without required capability",
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet)
},
},
{
desc: "apply AvailableComponents with cached AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents()))
},
},
{
desc: "apply AvailableComponents with new AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) {
availableComponents := generateTestAvailableComponents()
availableComponents.Hash = []byte("different")
require.NoError(t, client.SetAvailableComponents(availableComponents))
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 1, msg.SequenceNum)
msgAvailableComponents := msg.GetAvailableComponents()
require.NotNil(t, msgAvailableComponents)
require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash())
require.Nil(t, msgAvailableComponents.GetComponents())
return nil
})
},
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

availableComponents := generateTestAvailableComponents()
client.SetAvailableComponents(availableComponents)

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {},
},
Capabilities: tc.capabilities,
}
prepareClient(t, &settings, client)

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
msgAvailableComponents := msg.GetAvailableComponents()
if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
require.NotNil(t, msgAvailableComponents)
require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash())
require.Nil(t, msgAvailableComponents.GetComponents())
} else {
require.Nil(t, msgAvailableComponents)
}
return nil
})

tc.testFunc(t, client, srv)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
})
}
}

func generateTestAvailableComponents() *protobufs.AvailableComponents {
return &protobufs.AvailableComponents{
Hash: []byte("fake-hash"),
Components: map[string]*protobufs.ComponentDetails{
"receivers": {
Metadata: []*protobufs.KeyValue{
{
Key: "component",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: "filereceiver",
},
},
},
},
},
},
}
}
5 changes: 5 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag
return c.common.SendCustomMessage(message)
}

// SetAvailableComponents implements OpAMPClient.SetAvailableComponents
func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error {
return c.common.SetAvailableComponents(components)
}

func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
Expand Down
122 changes: 122 additions & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/client/internal"
Expand Down Expand Up @@ -311,3 +312,124 @@ func TestRedirectHTTP(t *testing.T) {
})
}
}

func TestHTTPReportsAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
capabilities protobufs.AgentCapabilities
availableComponents *protobufs.AvailableComponents
startErr error
}{
{
desc: "Does not report AvailableComponents",
availableComponents: nil,
},
{
desc: "Reports AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
availableComponents: generateTestAvailableComponents(),
},
{
desc: "No AvailableComponents on Start() despite capability",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
startErr: internal.ErrAvailableComponentsMissing,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
var rcvCounter atomic.Uint64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum)
rcvCounter.Add(1)
time.Sleep(50 * time.Millisecond)
if rcvCounter.Load() == 1 {
resp := &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
}

if tc.availableComponents != nil {
// the first message received should contain just the available component hash
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Nil(t, availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())

// add the flag asking for the full available component state to the response
resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return resp
}

if rcvCounter.Load() == 2 {
if tc.availableComponents != nil {
// the second message received should contain the full component state
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return nil
}

// all subsequent messages should not have any available components
require.Nil(t, msg.GetAvailableComponents())
return nil
}

// Start a client.
settings := types.StartSettings{}
settings.OpAMPServerURL = "http://" + srv.Endpoint
settings.Capabilities = tc.capabilities

client := NewHTTP(nil)
client.SetAvailableComponents(tc.availableComponents)
prepareClient(t, &settings, client)

startErr := client.Start(context.Background(), settings)
if tc.startErr == nil {
assert.NoError(t, startErr)
} else {
assert.ErrorIs(t, startErr, tc.startErr)
return
}

// Verify that status report is delivered.
eventually(t, func() bool {
return rcvCounter.Load() == 1
})

if tc.availableComponents != nil {
// Verify that status report is delivered again. Polling should ensure this.
eventually(t, func() bool {
return rcvCounter.Load() == 2
})
} else {
// Verify that no second status report is delivered (polling is too infrequent for this to happen in 50ms)
assert.Never(t, func() bool {
return rcvCounter.Load() == 2
}, 50*time.Millisecond, 10*time.Millisecond)
}

// Verify that no third status report is delivered (polling is too infrequent for this to happen in 50ms)
assert.Never(t, func() bool {
return rcvCounter.Load() == 3
}, 50*time.Millisecond, 10*time.Millisecond)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}
59 changes: 59 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
Expand Down Expand Up @@ -88,6 +89,10 @@ func (c *ClientCommon) PrepareStart(
return ErrHealthMissing
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil {
return ErrAvailableComponentsMissing
}

// Prepare remote config status.
if settings.RemoteConfigStatus == nil {
// RemoteConfigStatus is not provided. Start with empty.
Expand Down Expand Up @@ -212,6 +217,15 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
return err
}

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
var availableComponents *protobufs.AvailableComponents
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
availableComponents = &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
Expand All @@ -221,6 +235,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
msg.Capabilities = uint64(c.Capabilities)
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
msg.Flags = c.ClientSyncedState.Flags()
msg.AvailableComponents = availableComponents
},
)
return nil
Expand Down Expand Up @@ -433,3 +448,47 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess

return sendingChan, nil
}

// SetAvailableComponents sends a message to the server with the available components for the agent
func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error {
if !c.isStarted {
return c.ClientSyncedState.SetAvailableComponents(components)
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 {
return types.ErrReportsAvailableComponentsNotSet
}

if components == nil {
return types.ErrAvailableComponentsMissing
}

if len(components.Hash) == 0 {
return types.ErrNoAvailableComponentHash
}

// implement agent status compression, don't send the message if it hasn't changed from the previous message
availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components)

if availableComponentsChanged {
if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil {
return err
}

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
availableComponents := &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AvailableComponents = availableComponents
},
)

c.sender.ScheduleSend()
}

return nil
}
Loading

0 comments on commit 2ecac8d

Please sign in to comment.