Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor] Handle OpAMP connection settings #30237

Merged
merged 24 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add capability to supervisor and e2e test
  • Loading branch information
srikanthccv committed Dec 30, 2023
commit 1f7a776b95b435e46321b603a7a92b4986bb3d09
55 changes: 51 additions & 4 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
s := server.New(testLogger{t: t})
onConnectedFunc := callbacks.OnConnectedFunc
callbacks.OnConnectedFunc = func(conn types.Connection) {
agentConn.Store(conn)
isAgentConnected.Store(true)
connectedChan <- true
if onConnectedFunc != nil {
onConnectedFunc(conn)
}
agentConn.Store(conn)
isAgentConnected.Store(true)
connectedChan <- true
}
onConnectionCloseFunc := callbacks.OnConnectionCloseFunc
callbacks.OnConnectionCloseFunc = func(conn types.Connection) {
Expand Down Expand Up @@ -130,7 +130,9 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca

func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor {
cfgFile := getSupervisorConfig(t, configType, extraConfigData)
s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name())
logger, err := zap.NewDevelopment()
require.NoError(t, err)
s, err := supervisor.NewSupervisor(logger, cfgFile.Name())
require.NoError(t, err)

return s
Expand Down Expand Up @@ -470,3 +472,48 @@ func waitForSupervisorConnection(connection chan bool, connected bool) {
}
}
}

func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
var connectedToNewServer atomic.Bool
initialServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{})

s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr})
defer s.Shutdown()

waitForSupervisorConnection(initialServer.supervisorConnected, true)

newServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnConnectedFunc: func(_ types.Connection) {
connectedToNewServer.Store(true)
},
OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
return &protobufs.ServerToAgent{}
},
})

initialServer.sendToSupervisor(&protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Opamp: &protobufs.OpAMPConnectionSettings{
DestinationEndpoint: "ws://" + newServer.addr + "/v1/opamp",
Headers: &protobufs.Headers{
Headers: []*protobufs.Header{
{
Key: "x-foo",
Value: "bar",
},
},
},
},
},
})

require.Eventually(t, func() bool {
return connectedToNewServer.Load() == true
}, 10*time.Second, 500*time.Millisecond, "Collector did not connect to new OpAMP server")
}
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.10.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/config/configopaque v0.91.0
go.opentelemetry.io/collector/config/configtls v0.91.0
go.opentelemetry.io/collector/semconv v0.91.0
go.uber.org/zap v1.26.0
Expand All @@ -25,7 +26,6 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/sys v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
11 changes: 6 additions & 5 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ type Supervisor struct {

// Capabilities is the set of capabilities that the Supervisor supports.
type Capabilities struct {
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"`
ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"`
ReportsHealth *bool `mapstructure:"reports_health"`
ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"`
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I had missed this in earlier, and isn't strictly relevant for the PR but I am curious, what does a pointer value add here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the zero-value for bool is false but the default value may be true, using a pointer instead allows us to use a nil pointer value as a way to represent when the user didn't pass a value in and a default should be used.

AcceptsOpAMPConnectionSettings *bool `mapstructure:"accepts_opamp_connection_settings"`
ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"`
ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"`
ReportsHealth *bool `mapstructure:"reports_health"`
ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"`
}

type OpAMPServer struct {
Expand Down
13 changes: 11 additions & 2 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/open-telemetry/opamp-go/server"
serverTypes "github.com/open-telemetry/opamp-go/server/types"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"

Expand Down Expand Up @@ -339,6 +340,10 @@ func (s *Supervisor) Capabilities() protobufs.AgentCapabilities {
if c.ReportsRemoteConfig != nil && *c.ReportsRemoteConfig {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig
}

if c.AcceptsOpAMPConnectionSettings != nil && *c.AcceptsOpAMPConnectionSettings {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings
}
}
return supportedCapabilities
}
Expand All @@ -351,6 +356,7 @@ func (s *Supervisor) startOpAMP() error {
return err
}

s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint))
settings := types.StartSettings{
OpAMPServerURL: s.config.Server.Endpoint,
TLSConfig: tlsConfig,
Expand Down Expand Up @@ -412,11 +418,12 @@ func (s *Supervisor) startOpAMP() error {

func (s *Supervisor) stopOpAMP() error {
s.logger.Debug("Stopping OpAMP client...")
return s.opampClient.Stop(context.Background())
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
return s.opampClient.Stop(ctx)
}

func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header {
var headers http.Header
headers := make(http.Header)
for _, header := range protoHeaders.Headers {
headers.Add(header.Key, header.Value)
}
Expand Down Expand Up @@ -447,6 +454,8 @@ func (s *Supervisor) onOpampConnectionSettings(ctx context.Context, settings *pr
if len(settings.Certificate.PrivateKey) != 0 {
newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey)
}
} else {
newServerConfig.TLSSetting = configtls.TLSClientSetting{Insecure: true}
}

s.stopOpAMP()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
server:
endpoint: ws://{{.url}}/v1/opamp
tls:
insecure: true

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_opamp_connection_settings: true

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}