Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor] Handle OpAMP connection settings #30237

Merged
merged 24 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/supervisor-accepts-conn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Handle OpAMP connection settings.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21043]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
65 changes: 56 additions & 9 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
connectedChan := make(chan bool)
s := server.New(testLogger{t: t})
onConnectedFunc := callbacks.OnConnectedFunc
callbacks.OnConnectedFunc = func(conn types.Connection) {
callbacks.OnConnectedFunc = func(ctx context.Context, conn types.Connection) {
if onConnectedFunc != nil {
onConnectedFunc(ctx, conn)
}
agentConn.Store(conn)
isAgentConnected.Store(true)
connectedChan <- true
if onConnectedFunc != nil {
onConnectedFunc(conn)
}
}
onConnectionCloseFunc := callbacks.OnConnectionCloseFunc
callbacks.OnConnectionCloseFunc = func(conn types.Connection) {
Expand Down Expand Up @@ -133,7 +133,9 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca

func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor {
cfgFile := getSupervisorConfig(t, configType, extraConfigData)
s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name())
logger, err := zap.NewDevelopment()
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
s, err := supervisor.NewSupervisor(logger, cfgFile.Name())
require.NoError(t, err)

return s
Expand Down Expand Up @@ -175,7 +177,7 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -235,7 +237,7 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.Health != nil {
healthReport.Store(message.Health)
}
Expand Down Expand Up @@ -319,7 +321,7 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
capabilities.Store(message.Capabilities)

return &protobufs.ServerToAgent{}
Expand Down Expand Up @@ -372,7 +374,7 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
agentDescription.Store(message.AgentDescription)
}
Expand Down Expand Up @@ -473,3 +475,48 @@ func waitForSupervisorConnection(connection chan bool, connected bool) {
}
}
}

func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
var connectedToNewServer atomic.Bool
initialServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{})

s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr})
defer s.Shutdown()

waitForSupervisorConnection(initialServer.supervisorConnected, true)

newServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnConnectedFunc: func(_ context.Context, _ types.Connection) {
connectedToNewServer.Store(true)
},
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
return &protobufs.ServerToAgent{}
},
})

initialServer.sendToSupervisor(&protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Opamp: &protobufs.OpAMPConnectionSettings{
DestinationEndpoint: "ws://" + newServer.addr + "/v1/opamp",
Headers: &protobufs.Headers{
Headers: []*protobufs.Header{
{
Key: "x-foo",
Value: "bar",
},
},
},
},
},
})

require.Eventually(t, func() bool {
return connectedToNewServer.Load() == true
}, 10*time.Second, 500*time.Millisecond, "Collector did not connect to new OpAMP server")
}
4 changes: 2 additions & 2 deletions cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ require (
github.com/knadh/koanf/providers/rawbytes v0.1.0
github.com/knadh/koanf/v2 v2.0.1
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.11.0
github.com/open-telemetry/opamp-go v0.11.1-0.20240123204604-4d07a6af062f
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/config/configopaque v0.93.1-0.20240124123350-9047c0e373f9
go.opentelemetry.io/collector/config/configtls v0.93.1-0.20240124123350-9047c0e373f9
go.opentelemetry.io/collector/semconv v0.93.1-0.20240124123350-9047c0e373f9
go.uber.org/goleak v1.3.0
Expand All @@ -27,7 +28,6 @@ require (
github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/collector/config/configopaque v0.93.1-0.20240124123350-9047c0e373f9 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.16.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/opampsupervisor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package config

import (
"net/http"

"go.opentelemetry.io/collector/config/configtls"
)

Expand All @@ -16,15 +18,17 @@ type Supervisor struct {

// Capabilities is the set of capabilities that the Supervisor supports.
type Capabilities struct {
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"`
ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"`
ReportsHealth *bool `mapstructure:"reports_health"`
ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"`
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I had missed this in earlier, and isn't strictly relevant for the PR but I am curious, what does a pointer value add here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the zero-value for bool is false but the default value may be true, using a pointer instead allows us to use a nil pointer value as a way to represent when the user didn't pass a value in and a default should be used.

AcceptsOpAMPConnectionSettings *bool `mapstructure:"accepts_opamp_connection_settings"`
ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"`
ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"`
ReportsHealth *bool `mapstructure:"reports_health"`
ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"`
}

type OpAMPServer struct {
Endpoint string
Headers http.Header
TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"`
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/opampsupervisor/supervisor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package supervisor

import (
"context"
"net/http"

"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -28,7 +29,7 @@ func newServerSettings(fs flattenedSettings) server.StartSettings {
return serverTypes.ConnectionResponse{
Accept: true,
ConnectionCallbacks: server.ConnectionCallbacksStruct{
OnMessageFunc: func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
OnMessageFunc: func(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if fs.onMessageFunc != nil {
fs.onMessageFunc(conn, message)
}
Expand Down
89 changes: 83 additions & 6 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
serverTypes "github.com/open-telemetry/opamp-go/server/types"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"

Expand Down Expand Up @@ -334,6 +336,10 @@ func (s *Supervisor) Capabilities() protobufs.AgentCapabilities {
if c.ReportsRemoteConfig != nil && *c.ReportsRemoteConfig {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig
}

if c.AcceptsOpAMPConnectionSettings != nil && *c.AcceptsOpAMPConnectionSettings {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings
}
}
return supportedCapabilities
}
Expand All @@ -346,8 +352,10 @@ func (s *Supervisor) startOpAMP() error {
return err
}

s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint), zap.Any("headers", s.config.Server.Headers))
settings := types.StartSettings{
OpAMPServerURL: s.config.Server.Endpoint,
Header: s.config.Server.Headers,
TLSConfig: tlsConfig,
InstanceUid: s.instanceID.String(),
Callbacks: types.CallbacksStruct{
Expand All @@ -360,12 +368,8 @@ func (s *Supervisor) startOpAMP() error {
OnErrorFunc: func(err *protobufs.ServerErrorResponse) {
s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage))
},
OnMessageFunc: s.onMessage,
OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error {
// TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043
s.logger.Debug("Received ConnectionSettings request")
return nil
},
OnMessageFunc: s.onMessage,
OnOpampConnectionSettingsFunc: s.onOpampConnectionSettings,
OnOpampConnectionSettingsAcceptedFunc: func(settings *protobufs.OpAMPConnectionSettings) {
// TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043
s.logger.Debug("ConnectionSettings accepted")
Expand Down Expand Up @@ -409,6 +413,79 @@ func (s *Supervisor) startOpAMP() error {
return nil
}

func (s *Supervisor) stopOpAMP() error {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
s.logger.Debug("Stopping OpAMP client...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := s.opampClient.Stop(ctx)
// TODO(srikanthccv): remove context.DeadlineExceeded after https://github.com/open-telemetry/opamp-go/pull/213
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
return err
}
s.logger.Debug("OpAMP client stopped.")
return nil
}

func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header {
headers := make(http.Header)
for _, header := range protoHeaders.Headers {
headers.Add(header.Key, header.Value)
}
return headers
}

func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *protobufs.OpAMPConnectionSettings) error {
Copy link
Member

@tigrannajaryan tigrannajaryan Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we try to perform this operation in a separate goroutine so that we don't block here in this callback for the entire duration of switching the connections? Let the callback return quickly and initiate the re-establishing of the connection in a separate goroutine.

This would ensure that if the currently received message contains more data and not just connection settings all that data will be processed. With the current approach I am not sure what exactly will happen.

I think callbacks generally should avoid doing long-lasting blocking operations since they than block other callbacks and the entire opamp operation.

I also think the comment here is completely misleading. We should either implement what the comment says (i.e. the caller should do the reconnection) or fix the comment to sat the callback implementation should do the reconnection. @andykellr @evan-bradley Any thoughts on what you would prefer?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our agent does not currently support AcceptsOpAMPConnectionSettings so I have not spent much time with this part of the implementation. Looking at the code, the following things are confusing to me:

  1. There are several references to OnRemoteConfig (e.g. "See OnRemoteConfig for the behavior."). This was removed a while ago.

  2. This sentence doesn't make sense:

// If OnOpampConnectionSettings returns nil and then the caller will
// attempt to reconnect to the OpAMP Server using the new settings.

We should clearly state who is responsible for establishing a new connection. I could imagine this being fully implemented in the client library and only have OnOpampConnectionSettings responsible for validating the fields (e.g. endpoint matching a list of acceptable servers or certificate signed by acceptable authority). If this returns nil, the library would attempt to start a new client with the new settings, send an initial status message, and stop and transition to the new connection on success.

If we expect the implementer of the callback to do this, we should describe the process we expect to take place and implement that in tests and the example agent.

  1. If we do this in a separate goroutine like the example agent in the opamp-go library, we have no way of knowing if it will be successful and should be accepted. Returning nil from the callback will cause the settings to be accepted. If we create a separate goroutine we will have to wait for it before returning here.

https://github.com/open-telemetry/opamp-go/blob/7e92da0f17ef9f2fd0a387dd6b62b451c80f4207/client/internal/receivedprocessor.go#L198-L202

It is not possible to confirm success until the client is started and an initial status message is sent.

  1. OnOpampConnectionSettingsAccepted references OnOpampConnectionSettingsOffer which was renamed OnOpampConnectionSettings

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would ensure that if the currently received message contains more data and not just connection settings all that data will be processed. With the current approach I am not sure what exactly will happen.

onOpampConnectionSettings is the last part of the message processing in client implementation.

If we do this in a separate goroutine like the example agent in the opamp-go library, we have no way of knowing if it will be successful and should be accepted.

Right, the returned value from this callback indicates the reject/accept status of the connection settings. I don't see how we can make this async without changing how the client is expected to handle OnOpampConnectionSettings. Also, we don't report the error as described in OnOpampConnectionSettings open-telemetry/opamp-spec#164.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should clearly state who is responsible for establishing a new connection. I could imagine this being fully implemented in the client library and only have OnOpampConnectionSettings responsible for validating the fields (e.g. endpoint matching a list of acceptable servers or certificate signed by acceptable authority). If this returns nil, the library would attempt to start a new client with the new settings, send an initial status message, and stop and transition to the new connection on success.

I think this was the original intent. However, I am not sure this is the best approach. It is also not how the example implementation works and I don't see good arguments against how the example is implemented today.

To summarize this is what the example is supposed to do:

  1. OnOpampConnectionSettings callback called when new settings are offered by the Server.
  2. The callback implementation first pre-verifies the settings (e.g. check certificates, etc).
  3. If checks in (2) pass the callback implementation logs that it is starting to reconnect, creates a reconnection goroutine and returns from the callback.
  4. Reconnection goroutine stops the Client, creates a new Client with new connection settings and wait for successful OnConnect() callback.
  5. If OnConnect() is not called within a predefined period of time reconnection goroutine assumes the new connection settings are bad, reverts to the old connection settings and re-creates the Client again.
  6. We get rid of OnOpampConnectionSettingsAccepted(), it is not needed.

The example implementation of steps 4 and 5 is not complete today (e.g not waiting for OnConnect), but can be modified to match what I described.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created an issue for this open-telemetry/opamp-go#261

Let's also discuss today in our call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, open-telemetry/opamp-go#266 removes OnOpampConnectionSettingsAccepted callback.

Copy link
Member Author

@srikanthccv srikanthccv Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the implementation to run in a separate goroutine and use OnConnect to determine the connection status.

if settings == nil {
s.logger.Debug("Received ConnectionSettings request with nil settings")
return nil
}

newServerConfig := &config.OpAMPServer{}

if settings.DestinationEndpoint != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there any value in using some of the confmap work to merge these values together instead of needing to update each one explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very familiar with confmap work. I will take a look at it and If it helps here I can send a separate PR for it.

newServerConfig.Endpoint = settings.DestinationEndpoint
}
if settings.Headers != nil {
newServerConfig.Headers = s.getHeadersFromSettings(settings.Headers)
}
if settings.Certificate != nil {
if len(settings.Certificate.CaPublicKey) != 0 {
newServerConfig.TLSSetting.CAPem = configopaque.String(settings.Certificate.CaPublicKey)
}
if len(settings.Certificate.PublicKey) != 0 {
newServerConfig.TLSSetting.CertPem = configopaque.String(settings.Certificate.PublicKey)
}
if len(settings.Certificate.PrivateKey) != 0 {
newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey)
}
} else {
newServerConfig.TLSSetting = configtls.TLSClientSetting{Insecure: true}
}

if err := s.stopOpAMP(); err != nil {
s.logger.Error("Cannot stop the OpAMP client", zap.Error(err))
return err
}

// take a copy of the current OpAMP server config
oldServerConfig := s.config.Server
// update the OpAMP server config
s.config.Server = newServerConfig

if err := s.startOpAMP(); err != nil {
s.logger.Error("Cannot connect to the OpAMP server using the new settings", zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startOpAMP() initiates but does not wait for successful connection. I think we need to wait for it before we consider the new opamp settings successful. It can be done in a future PR, but would be good to add a TODO here.

// revert the OpAMP server config
s.config.Server = oldServerConfig
// start the OpAMP client with the old settings
if err := s.startOpAMP(); err != nil {
s.logger.Error("Cannot reconnect to the OpAMP server after restoring old settings", zap.Error(err))
return err
}
}

return nil
}

// TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073
func (s *Supervisor) createInstanceID() (ulid.ULID, error) {
entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
server:
endpoint: ws://{{.url}}/v1/opamp
tls:
insecure: true

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_opamp_connection_settings: true

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
Loading