Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add implementation of new AvailableComponents message #340

Merged
merged 13 commits into from
Jan 27, 2025
Merged
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,14 @@ type OpAMPClient interface {
// If no error is returned, the channel returned will be closed after the specified
// message is sent.
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)

// SetAvailableComponents modifies the set of components that are available for configuration
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// on the agent.
// May be called any time after Start(), including from the OnMessage handler.
// The new components will be sent with the next message to the server.
// If components is nil, errReportsAvailableComponentsNotSet will be returned.
// If components.Hash is nil or an empty []byte, errNoAvailableComponentHash will be returned.
// This method is subject to agent status compression - if components is not
// different from the cached agent state, this method is a no-op.
SetAvailableComponents(components *protobufs.AvailableComponents) error
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
}
123 changes: 123 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2305,3 +2305,126 @@ func TestSetFlagsBeforeStart(t *testing.T) {
assert.NoError(t, err)
})
}

func TestSetAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
capabilities protobufs.AgentCapabilities
testFunc func(t *testing.T, client OpAMPClient, srv *internal.MockServer)
}{
{
desc: "apply nil AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(nil), types.ErrAvailableComponentsMissing)
},
},
{
desc: "apply AvailableComponents with empty hash",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(&protobufs.AvailableComponents{}), types.ErrNoAvailableComponentHash)
},
},
{
desc: "apply AvailableComponents without required capability",
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.ErrorIs(t, client.SetAvailableComponents(generateTestAvailableComponents()), types.ErrReportsAvailableComponentsNotSet)
},
},
{
desc: "apply AvailableComponents with cached AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, _ *internal.MockServer) {
require.NoError(t, client.SetAvailableComponents(generateTestAvailableComponents()))
},
},
{
desc: "apply AvailableComponents with new AvailableComponents",
capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
testFunc: func(t *testing.T, client OpAMPClient, srv *internal.MockServer) {
availableComponents := generateTestAvailableComponents()
availableComponents.Hash = []byte("different")
require.NoError(t, client.SetAvailableComponents(availableComponents))
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 1, msg.SequenceNum)
msgAvailableComponents := msg.GetAvailableComponents()
require.NotNil(t, msgAvailableComponents)
require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash())
require.Nil(t, msgAvailableComponents.GetComponents())
return nil
})
},
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

availableComponents := generateTestAvailableComponents()

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {},
},
Capabilities: tc.capabilities,
AvailableComponents: availableComponents,
}
prepareClient(t, &settings, client)

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
msgAvailableComponents := msg.GetAvailableComponents()
if tc.capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
require.NotNil(t, msgAvailableComponents)
require.Equal(t, msgAvailableComponents.GetHash(), availableComponents.GetHash())
require.Nil(t, msgAvailableComponents.GetComponents())
} else {
require.Nil(t, msgAvailableComponents)
}
return nil
})

tc.testFunc(t, client, srv)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
})
}
}

func generateTestAvailableComponents() *protobufs.AvailableComponents {
return &protobufs.AvailableComponents{
Hash: []byte("fake-hash"),
Components: map[string]*protobufs.ComponentDetails{
"receivers": {
Metadata: []*protobufs.KeyValue{
{
Key: "component",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: "filereceiver",
},
},
},
},
},
},
}
}
5 changes: 5 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag
return c.common.SendCustomMessage(message)
}

// SetAvailableComponents implements OpAMPClient.SetAvailableComponents
func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error {
return c.common.SetAvailableComponents(components)
}

func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
Expand Down
110 changes: 110 additions & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/client/internal"
Expand Down Expand Up @@ -311,3 +312,112 @@ func TestRedirectHTTP(t *testing.T) {
})
}
}

func TestHTTPReportsAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
availableComponents *protobufs.AvailableComponents
}{
{
desc: "Does not report AvailableComponents",
availableComponents: nil,
},
{
desc: "Reports AvailableComponents",
availableComponents: generateTestAvailableComponents(),
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
var rcvCounter atomic.Uint64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum)
rcvCounter.Add(1)
time.Sleep(50 * time.Millisecond)
if rcvCounter.Load() == 1 {
resp := &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
}

if tc.availableComponents != nil {
// the first message received should contain just the available component hash
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Nil(t, availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())

// add the flag asking for the full available component state to the response
resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return resp
}

if rcvCounter.Load() == 2 {
if tc.availableComponents != nil {
// the second message received should contain the full component state
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return nil
}

// all subsequent messages should not have any available components
require.Nil(t, msg.GetAvailableComponents())
return nil
}

// Start a client.
settings := types.StartSettings{}
settings.OpAMPServerURL = "http://" + srv.Endpoint
if tc.availableComponents != nil {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
settings.AvailableComponents = tc.availableComponents
}

client := NewHTTP(nil)
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// Verify that status report is delivered.
eventually(t, func() bool {
return rcvCounter.Load() == 1
})

if tc.availableComponents != nil {
// Verify that status report is delivered again. Polling should ensure this.
eventually(t, func() bool {
return rcvCounter.Load() == 2
})
} else {
// Verify that no second status report is delivered (polling is too infrequent for this to happen in 3 seconds)
require.Never(t, func() bool {
mrsillydog marked this conversation as resolved.
Show resolved Hide resolved
return rcvCounter.Load() == 2
}, 3*time.Second, 10*time.Millisecond)
}

// Verify that no third status report is delivered (polling is too infrequent for this to happen in 3 seconds)
require.Never(t, func() bool {
return rcvCounter.Load() == 3
}, 3*time.Second, 10*time.Millisecond)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}
61 changes: 61 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
Expand Down Expand Up @@ -88,6 +89,16 @@
return ErrHealthMissing
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
if settings.AvailableComponents == nil {
return ErrAvailableComponentsMissing
}

Check warning on line 95 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L94-L95

Added lines #L94 - L95 were not covered by tests

if err := c.ClientSyncedState.SetAvailableComponents(settings.AvailableComponents); err != nil {
return err
}

Check warning on line 99 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L98-L99

Added lines #L98 - L99 were not covered by tests
}

// Prepare remote config status.
if settings.RemoteConfigStatus == nil {
// RemoteConfigStatus is not provided. Start with empty.
Expand Down Expand Up @@ -212,6 +223,15 @@
return err
}

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
var availableComponents *protobufs.AvailableComponents
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
availableComponents = &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
Expand All @@ -221,6 +241,7 @@
msg.Capabilities = uint64(c.Capabilities)
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
msg.Flags = c.ClientSyncedState.Flags()
msg.AvailableComponents = availableComponents
},
)
return nil
Expand Down Expand Up @@ -433,3 +454,43 @@

return sendingChan, nil
}

// SetAvailableComponents sends a message to the server with the available components for the agent
func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error {
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 {
return types.ErrReportsAvailableComponentsNotSet
}

if components == nil {
return types.ErrAvailableComponentsMissing
}

if len(components.Hash) == 0 {
return types.ErrNoAvailableComponentHash
}

// implement agent status compression, don't send the message if it hasn't changed from the previous message
availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components)

if availableComponentsChanged {
if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil {
return err
}

Check warning on line 478 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L477-L478

Added lines #L477 - L478 were not covered by tests

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
availableComponents := &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AvailableComponents = availableComponents
},
)

c.sender.ScheduleSend()
}

return nil
}
Loading
Loading