Skip to content

Commit

Permalink
Add support for CustomCapabilities and CustomMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
andykellr committed Oct 25, 2023
1 parent c1931d7 commit 8c346ad
Show file tree
Hide file tree
Showing 15 changed files with 1,102 additions and 506 deletions.
8 changes: 8 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,12 @@ type OpAMPClient interface {
// May be called anytime after Start(), including from OnMessage handler.
// nil values are not allowed and will return an error.
SetPackageStatuses(statuses *protobufs.PackageStatuses) error

// SetCustomMessage sets the custom message that will be sent to the Server. May be
// called anytime after Start(), including from OnMessage handler.
//
// If the CustomMessage is nil or it specifies a capability that is not listed in the
// CustomCapabilities provided in the StartSettings for the client, it will return an
// error.
SetCustomMessage(message *protobufs.CustomMessage) error
}
219 changes: 219 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,3 +1464,222 @@ func TestMissingPackagesStateProvider(t *testing.T) {
assert.ErrorIs(t, client.Start(context.Background(), settings), internal.ErrAcceptsPackagesNotSet)
})
}

func TestReportCustomCapabilities(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

var clientRcvCustomMessage atomic.Value

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
CustomCapabilities: []string{"local.test.echo"},
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
clientRcvCustomMessage.Store(msg.CustomMessage)
},
},
}
prepareClient(t, &settings, client)

clientCustomCapabilities := &protobufs.CustomCapabilities{
Capabilities: []string{"local.test.echo"},
}

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have the CustomCapabilities.
assert.True(t, proto.Equal(clientCustomCapabilities, msg.CustomCapabilities))
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
CustomCapabilities: &protobufs.CustomCapabilities{
Capabilities: []string{"local.test.echo"},
},
}
})

clientEchoRequest := &protobufs.CustomMessage{
Capability: "local.test.echo",
Type: "request",
Data: []byte("data"),
}
serverEchoResponse := &protobufs.CustomMessage{
Capability: "local.test.echo",
Type: "response",
Data: []byte("data"),
}

// Client --->
// Send a custom message to the server
_ = client.SetCustomMessage(clientEchoRequest)

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// CustomCapabilities must not be sent again.
assert.Nil(t, msg.CustomCapabilities)

assert.EqualValues(t, 1, msg.SequenceNum)

// Send a custom message response and ask client for full state again.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState),
CustomMessage: serverEchoResponse,
}
})

// Verify response received
// Client --->
eventually(
t,
func() bool {
msg, ok := clientRcvCustomMessage.Load().(*protobufs.CustomMessage)
if !ok || msg == nil {
return false
}
return proto.Equal(serverEchoResponse, msg)
},
)

// Server has requested the client to report, so there will be another message.
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 2, msg.SequenceNum)

// CustomCapabilities should be sent since ReportFullState was requested
assert.True(t, proto.Equal(clientCustomCapabilities, msg.CustomCapabilities))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
})

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}

// TestSetCustomMessage tests the SetCustomMessage method to ensure it returns errors
// appropriately.
func TestSetCustomMessage(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{},
CustomCapabilities: []string{"io.opentelemetry.supported"},
}
startClient(t, settings, client)

tests := []struct {
name string
message *protobufs.CustomMessage
expectedError error
}{
{
name: "nil message is error",
message: nil,
expectedError: internal.ErrCustomMessageMissing,
},
{
name: "unsupported message is error",
message: &protobufs.CustomMessage{
Capability: "io.opentelemetry.not-supported",
},
expectedError: internal.ErrCustomCapabilityNotSupported,
},
{
name: "supported capability is ok",
message: &protobufs.CustomMessage{
Capability: "io.opentelemetry.supported",
},
expectedError: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.ErrorIs(t, client.SetCustomMessage(test.message), test.expectedError)
})
}
})
}

// TestCustomMessages tests the custom messages functionality.
func TestCustomMessages(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
var rcvCustomMessage atomic.Value
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.CustomMessage != nil {
rcvCustomMessage.Store(msg.CustomMessage)
}
return nil
}

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
CustomCapabilities: []string{"local.test.example"},
}
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// Send message 1
customMessage1 := &protobufs.CustomMessage{
Capability: "local.test.example",
Type: "hello",
Data: []byte("test message 1"),
}
assert.NoError(t, client.SetCustomMessage(customMessage1))

// Verify message 1 delivered
eventually(
t,
func() bool {
msg, ok := rcvCustomMessage.Load().(*protobufs.CustomMessage)
if !ok || msg == nil {
return false
}
return proto.Equal(customMessage1, msg)
},
)

// Send message 2
customMessage2 := &protobufs.CustomMessage{
Capability: "local.test.example",
Type: "hello",
Data: []byte("test message 2"),
}
assert.NoError(t, client.SetCustomMessage(customMessage2))

// Verify message 2 delivered
eventually(
t,
func() bool {
msg, ok := rcvCustomMessage.Load().(*protobufs.CustomMessage)
if !ok || msg == nil {
return false
}
return proto.Equal(customMessage2, msg)
},
)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
5 changes: 5 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) err
return c.common.SetPackageStatuses(statuses)
}

// SetCustomMessage implements OpAMPClient.SetCustomMessage.
func (c *httpClient) SetCustomMessage(message *protobufs.CustomMessage) error {
return c.common.SetCustomMessage(message)
}

func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
Expand Down
47 changes: 44 additions & 3 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ var (
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrCustomMessageMissing = errors.New("CustomMessage is nil")
ErrCustomCapabilityNotSupported = errors.New("CustomCapability of CustomMessage is not supported")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
Expand Down Expand Up @@ -60,9 +62,7 @@ type ClientCommon struct {

// NewClientCommon creates a new ClientCommon.
func NewClientCommon(logger types.Logger, sender Sender) ClientCommon {
return ClientCommon{
Logger: logger, sender: sender, stoppedSignal: make(chan struct{}, 1),
}
return ClientCommon{Logger: logger, sender: sender, stoppedSignal: make(chan struct{}, 1)}
}

// PrepareStart prepares the client state for the next Start() call.
Expand All @@ -79,6 +79,13 @@ func (c *ClientCommon) PrepareStart(
// According to OpAMP spec this capability MUST be set, since all Agents MUST report status.
c.Capabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus

// Prepare custom capabilities.
if err := c.ClientSyncedState.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: settings.CustomCapabilities,
}); err != nil {
return err
}

Check warning on line 87 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L86-L87

Added lines #L86 - L87 were not covered by tests

if c.ClientSyncedState.AgentDescription() == nil {
return ErrAgentDescriptionMissing
}
Expand Down Expand Up @@ -215,6 +222,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
msg.RemoteConfigStatus = c.ClientSyncedState.RemoteConfigStatus()
msg.PackageStatuses = c.ClientSyncedState.PackageStatuses()
msg.Capabilities = uint64(c.Capabilities)
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
},
)
return nil
Expand Down Expand Up @@ -359,3 +367,36 @@ func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) e

return nil
}

// SetCustomCapabilities sends a message to the Server with the new custom capabilities.
func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error {
// store the health to send on reconnect
if err := c.ClientSyncedState.SetCustomCapabilities(customCapabilities); err != nil {
return err
}
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
},

Check warning on line 380 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L372-L380

Added lines #L372 - L380 were not covered by tests
)
c.sender.ScheduleSend()
return nil

Check warning on line 383 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L382-L383

Added lines #L382 - L383 were not covered by tests
}

// SetCustomMessage sends the specified custom message to the server.
func (c *ClientCommon) SetCustomMessage(message *protobufs.CustomMessage) error {
if message == nil {
return ErrCustomMessageMissing
}
if !c.ClientSyncedState.HasCustomCapability(message.Capability) {
return ErrCustomCapabilityNotSupported
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.CustomMessage = message
},
)
c.sender.ScheduleSend()
return nil
}
42 changes: 42 additions & 0 deletions client/internal/clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
errLastRemoteConfigHashNil = errors.New("LastRemoteConfigHash is nil")
errPackageStatusesMissing = errors.New("PackageStatuses is not set")
errServerProvidedAllPackagesHashNil = errors.New("ServerProvidedAllPackagesHash is nil")
errCustomCapabilitiesMissing = errors.New("CustomCapabilities is not set")
)

// ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to
Expand All @@ -37,6 +38,7 @@ type ClientSyncedState struct {
health *protobufs.ComponentHealth
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
customCapabilities *protobufs.CustomCapabilities
}

func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription {
Expand All @@ -63,6 +65,12 @@ func (s *ClientSyncedState) PackageStatuses() *protobufs.PackageStatuses {
return s.packageStatuses
}

func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities {
defer s.mutex.Unlock()
s.mutex.Lock()
return s.customCapabilities
}

// SetAgentDescription sets the AgentDescription in the state.
func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error {
if descr == nil {
Expand Down Expand Up @@ -126,3 +134,37 @@ func (s *ClientSyncedState) SetPackageStatuses(status *protobufs.PackageStatuses

return nil
}

// SetCustomCapabilities sets the CustomCapabilities in the state.
func (s *ClientSyncedState) SetCustomCapabilities(capabilities *protobufs.CustomCapabilities) error {
if capabilities == nil {
return errCustomCapabilitiesMissing
}

Check warning on line 142 in client/internal/clientstate.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientstate.go#L141-L142

Added lines #L141 - L142 were not covered by tests

clone := proto.Clone(capabilities).(*protobufs.CustomCapabilities)

defer s.mutex.Unlock()
s.mutex.Lock()
s.customCapabilities = clone

return nil
}

// HasCustomCapability returns true if the provided capability is in the
// CustomCapabilities.
func (s *ClientSyncedState) HasCustomCapability(capability string) bool {
defer s.mutex.Unlock()
s.mutex.Lock()

if s.customCapabilities == nil {
return false
}

Check warning on line 161 in client/internal/clientstate.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientstate.go#L160-L161

Added lines #L160 - L161 were not covered by tests

for _, c := range s.customCapabilities.Capabilities {
if c == capability {
return true
}
}

return false
}
Loading

0 comments on commit 8c346ad

Please sign in to comment.