Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add implementation of new AvailableComponents message #340

Merged
merged 13 commits into from
Jan 27, 2025
Merged
Prev Previous commit
Next Next commit
Add unit tests, fix issues found by tests, document
  • Loading branch information
mrsillydog committed Jan 10, 2025
commit 26a7996854641ab4f4eda62d8d7912d1b1b360c8
7 changes: 6 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ type OpAMPClient interface {

// SetAvailableComponents modifies the set of components that are available for configuration
// on the agent.
// TODO more documentation
// May be called any time after Start(), including from the OnMessage handler.
// The new components will be sent with the next message to the server.
// If components is nil, errReportsAvailableComponentsNotSet will be returned.
// If components.Hash is nil or an empty []byte, errNoAvailableComponentHash will be returned.
// This method is subject to agent status compression - if components is not
// different from the cached agent state, this method is a no-op.
SetAvailableComponents(components *protobufs.AvailableComponents) error
}
20 changes: 20 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2305,3 +2305,23 @@ func TestSetFlagsBeforeStart(t *testing.T) {
assert.NoError(t, err)
})
}

func generateTestAvailableComponents() *protobufs.AvailableComponents {
return &protobufs.AvailableComponents{
Hash: []byte("fake-hash"),
Components: map[string]*protobufs.ComponentDetails{
"receivers": {
Metadata: []*protobufs.KeyValue{
{
Key: "component",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: "filereceiver",
},
},
},
},
},
},
}
}
110 changes: 110 additions & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/client/internal"
Expand Down Expand Up @@ -311,3 +312,112 @@ func TestRedirectHTTP(t *testing.T) {
})
}
}

func TestHTTPReportsAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
availableComponents *protobufs.AvailableComponents
}{
{
desc: "Does not report AvailableComponents",
availableComponents: nil,
},
{
desc: "Reports AvailableComponents",
availableComponents: generateTestAvailableComponents(),
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
var rcvCounter atomic.Uint64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum)
rcvCounter.Add(1)
time.Sleep(50 * time.Millisecond)
if rcvCounter.Load() == 1 {
resp := &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
}

if tc.availableComponents != nil {
// the first message received should contain just the available component hash
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Nil(t, availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())

// add the flag asking for the full available component state to the response
resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return resp
}

if rcvCounter.Load() == 2 {
if tc.availableComponents != nil {
// the second message received should contain the full component state
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return nil
}

// all subsequent messages should not have any available components
require.Nil(t, msg.GetAvailableComponents())
return nil
}

// Start a client.
settings := types.StartSettings{}
settings.OpAMPServerURL = "http://" + srv.Endpoint
if tc.availableComponents != nil {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
settings.AvailableComponents = tc.availableComponents
}

client := NewHTTP(nil)
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// Verify that status report is delivered.
eventually(t, func() bool {
return rcvCounter.Load() == 1
})

if tc.availableComponents != nil {
// Verify that status report is delivered again. Polling should ensure this.
eventually(t, func() bool {
return rcvCounter.Load() == 2
})
} else {
// Verify that no second status report is delivered (polling is too infrequent for this to happen in 3 seconds)
require.Never(t, func() bool {
return rcvCounter.Load() == 2
}, 3*time.Second, 10*time.Millisecond)
}

// Verify that no third status report is delivered (polling is too infrequent for this to happen in 3 seconds)
require.Never(t, func() bool {
return rcvCounter.Load() == 3
}, 3*time.Second, 10*time.Millisecond)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}
17 changes: 13 additions & 4 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,14 @@
return ErrHealthMissing
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil {
return ErrAvailableComponentsMissing
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
if settings.AvailableComponents == nil {
return ErrAvailableComponentsMissing
}

Check warning on line 96 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L95-L96

Added lines #L95 - L96 were not covered by tests

if err := c.ClientSyncedState.SetAvailableComponents(settings.AvailableComponents); err != nil {
return err
}

Check warning on line 100 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}

// Prepare remote config status.
Expand Down Expand Up @@ -220,8 +226,11 @@

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
availableComponents := &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
var availableComponents *protobufs.AvailableComponents
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
availableComponents = &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}
}

c.sender.NextMessage().Update(
Expand Down Expand Up @@ -448,31 +457,31 @@
}

// SetAvailableComponents sends a message to the server with the available components for the agent
func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error {
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 {
return errReportsAvailableComponentsNotSet
}

Check warning on line 463 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L460-L463

Added lines #L460 - L463 were not covered by tests

if len(components.Hash) == 0 {
return errNoAvailableComponentHash
}

Check warning on line 467 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L465-L467

Added lines #L465 - L467 were not covered by tests

// implement agent status compression, don't send the message if it hasn't changed from the previous message
availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components)

if availableComponentsChanged {
if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil {
return err
}

Check warning on line 475 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L470-L475

Added lines #L470 - L475 were not covered by tests

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AvailableComponents = c.ClientSyncedState.AvailableComponents()
},

Check warning on line 480 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L477-L480

Added lines #L477 - L480 were not covered by tests
)

c.sender.ScheduleSend()

Check warning on line 483 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L483

Added line #L483 was not covered by tests
}

return nil

Check warning on line 486 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L486

Added line #L486 was not covered by tests
}
5 changes: 5 additions & 0 deletions client/types/startsettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ type StartSettings struct {
//
// If the ReportsHeartbeat capability is disabled, this option has no effect.
HeartbeatInterval *time.Duration

// Defines the available components of the Agent.
// Required if the ReportsAvailableComponents capability is set.
// If the ReportsAvailableComponents capability is not set, this option has no effect.
AvailableComponents *protobufs.AvailableComponents
}
93 changes: 93 additions & 0 deletions client/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,96 @@ func TestHandlesConnectionError(t *testing.T) {
err = client.Stop(context.Background())
require.NoError(t, err)
}

func TestWSSenderReportsAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
availableComponents *protobufs.AvailableComponents
}{
{
desc: "Does not report AvailableComponents",
availableComponents: nil,
},
{
desc: "Reports AvailableComponents",
availableComponents: generateTestAvailableComponents(),
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
srv := internal.StartMockServer(t)

var firstMsg atomic.Bool
var conn atomic.Value
srv.OnWSConnect = func(c *websocket.Conn) {
conn.Store(c)
firstMsg.Store(true)
}
var msgCount atomic.Int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if firstMsg.Load() {
msgCount.Add(1)
firstMsg.Store(false)
resp := &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
}

if tc.availableComponents != nil {
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Nil(t, availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())

resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return resp
}
msgCount.Add(1)
if tc.availableComponents != nil {
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())
require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents())
} else {
require.Error(t, errors.New("should not receive a second message when ReportsAvailableComponents is disabled"))
}

return nil
}

// Start an OpAMP/WebSocket client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
}
client := NewWebSocket(nil)

if tc.availableComponents != nil {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
settings.AvailableComponents = tc.availableComponents
}

startClient(t, settings, client)

// Wait for connection to be established.
eventually(t, func() bool { return conn.Load() != nil })

if tc.availableComponents != nil {
assert.Eventually(t, func() bool {
return msgCount.Load() >= 2
}, 5*time.Second, 10*time.Millisecond)
} else {
assert.Never(t, func() bool {
return msgCount.Load() >= 2
}, 3*time.Second, 10*time.Millisecond)
}

// Stop the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}
Loading