diff --git a/.chloggen/dakotapaasman_bpop-1087-supervisor-configurable-opamp-server-port.yaml b/.chloggen/dakotapaasman_bpop-1087-supervisor-configurable-opamp-server-port.yaml new file mode 100644 index 0000000000000..cc4279e6b783c --- /dev/null +++ b/.chloggen/dakotapaasman_bpop-1087-supervisor-configurable-opamp-server-port.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Makes the Supervisor's OpAmp server port configurable with 'agent::opamp_server_port'. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36001] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 5b6a7ac4adbfa..1fde97d26a732 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -1443,6 +1443,71 @@ func TestSupervisorLogging(t *testing.T) { require.NoError(t, logFile.Close()) } +func TestSupervisorOpAmpServerPort(t *testing.T) { + var agentConfig atomic.Value + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + + return &protobufs.ServerToAgent{} + }, + }) + + supervisorOpAmpServerPort, err := findRandomPort() + require.NoError(t, err) + + s := newSupervisor(t, "server_port", map[string]string{"url": server.addr, "supervisor_opamp_server_port": fmt.Sprintf("%d", supervisorOpAmpServerPort)}) + + require.Nil(t, s.Start()) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + require.Eventually(t, func() bool { + cfg, ok := agentConfig.Load().(string) + if ok { + // The effective config may be structurally different compared to what was sent, + // and will also have some data redacted, + // so just check that it includes the filelog receiver + return strings.Contains(cfg, "filelog") + } + + return false + }, 5*time.Second, 500*time.Millisecond, "Collector was not started with remote config") + + n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n") + require.NotZero(t, n, "Could not write to input file") + require.NoError(t, err) + + require.Eventually(t, func() bool { + logRecord := make([]byte, 1024) + n, _ := outputFile.Read(logRecord) + + return n != 0 + }, 10*time.Second, 500*time.Millisecond, "Log never appeared in output") +} + func findRandomPort() (int, error) { l, err := net.Listen("tcp", "localhost:0") diff --git a/cmd/opampsupervisor/specification/README.md b/cmd/opampsupervisor/specification/README.md index a45d7b6d38d1b..1214eff9c2a14 100644 --- a/cmd/opampsupervisor/specification/README.md +++ b/cmd/opampsupervisor/specification/README.md @@ -160,7 +160,14 @@ agent: client.id: "01HWWSK84BMT7J45663MBJMTPJ" non_identifying_attributes: custom.attribute: "custom-value" - + + # The port the Collector's health check extension will be configured to use + health_check_port: + + # The port the Supervisor will start its OpAmp server on and the Collector's + # OpAmp extension will connect to + opamp_server_port: + ``` ### Operation When OpAMP Server is Unavailable diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 5e6049dddbd85..2b77cc56bd625 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -155,6 +155,7 @@ type Agent struct { Description AgentDescription `mapstructure:"description"` BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"` HealthCheckPort int `mapstructure:"health_check_port"` + OpAMPServerPort int `mapstructure:"opamp_server_port"` PassthroughLogs bool `mapstructure:"passthrough_logs"` } @@ -171,6 +172,10 @@ func (a Agent) Validate() error { return errors.New("agent::health_check_port must be a valid port number") } + if a.OpAMPServerPort < 0 || a.OpAMPServerPort > 65535 { + return errors.New("agent::opamp_server_port must be a valid port number") + } + if a.Executable == "" { return errors.New("agent::executable must be specified") } diff --git a/cmd/opampsupervisor/supervisor/config/config_test.go b/cmd/opampsupervisor/supervisor/config/config_test.go index 92938f210708e..9616c9da52d57 100644 --- a/cmd/opampsupervisor/supervisor/config/config_test.go +++ b/cmd/opampsupervisor/supervisor/config/config_test.go @@ -227,7 +227,7 @@ func TestValidate(t *testing.T) { expectedError: "agent::orphan_detection_interval must be positive", }, { - name: "Invalid port number", + name: "Invalid health check port number", config: Supervisor{ Server: OpAMPServer{ Endpoint: "wss://localhost:9090/opamp", @@ -254,7 +254,7 @@ func TestValidate(t *testing.T) { expectedError: "agent::health_check_port must be a valid port number", }, { - name: "Zero value port number", + name: "Zero value health check port number", config: Supervisor{ Server: OpAMPServer{ Endpoint: "wss://localhost:9090/opamp", @@ -280,7 +280,7 @@ func TestValidate(t *testing.T) { }, }, { - name: "Normal port number", + name: "Normal health check port number", config: Supervisor{ Server: OpAMPServer{ Endpoint: "wss://localhost:9090/opamp", @@ -331,6 +331,53 @@ func TestValidate(t *testing.T) { }, expectedError: "agent::bootstrap_timeout must be positive", }, + { + name: "Invalid opamp server port number", + config: Supervisor{ + Server: OpAMPServer{ + Endpoint: "wss://localhost:9090/opamp", + Headers: http.Header{ + "Header1": []string{"HeaderValue"}, + }, + }, + Agent: Agent{ + Executable: "${file_path}", + OrphanDetectionInterval: 5 * time.Second, + OpAMPServerPort: 65536, + BootstrapTimeout: 5 * time.Second, + }, + Capabilities: Capabilities{ + AcceptsRemoteConfig: true, + }, + Storage: Storage{ + Directory: "/etc/opamp-supervisor/storage", + }, + }, + expectedError: "agent::opamp_server_port must be a valid port number", + }, + { + name: "Zero value opamp server port number", + config: Supervisor{ + Server: OpAMPServer{ + Endpoint: "wss://localhost:9090/opamp", + Headers: http.Header{ + "Header1": []string{"HeaderValue"}, + }, + }, + Agent: Agent{ + Executable: "${file_path}", + OrphanDetectionInterval: 5 * time.Second, + OpAMPServerPort: 0, + BootstrapTimeout: 5 * time.Second, + }, + Capabilities: Capabilities{ + AcceptsRemoteConfig: true, + }, + Storage: Storage{ + Directory: "/etc/opamp-supervisor/storage", + }, + }, + }, } // create some fake files for validating agent config diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 8d683e5b09b4d..48f3439413d32 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -263,7 +263,7 @@ func (s *Supervisor) createTemplates() error { // shuts down the Collector. This only needs to happen // once per Collector binary. func (s *Supervisor) getBootstrapInfo() (err error) { - s.opampServerPort, err = s.findRandomPort() + s.opampServerPort, err = s.getSupervisorOpAMPServerPort() if err != nil { return err } @@ -457,7 +457,7 @@ func (s *Supervisor) startOpAMPServer() error { s.opampServer = server.New(newLoggerFromZap(s.logger)) var err error - s.opampServerPort, err = s.findRandomPort() + s.opampServerPort, err = s.getSupervisorOpAMPServerPort() if err != nil { return err } @@ -1345,6 +1345,13 @@ func (s *Supervisor) agentConfigFilePath() string { return filepath.Join(s.config.Storage.Directory, agentConfigFileName) } +func (s *Supervisor) getSupervisorOpAMPServerPort() (int, error) { + if s.config.Agent.OpAMPServerPort != 0 { + return s.config.Agent.OpAMPServerPort, nil + } + return s.findRandomPort() +} + func (s *Supervisor) findRandomPort() (int, error) { l, err := net.Listen("tcp", "localhost:0") diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_server_port.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_server_port.yaml new file mode 100644 index 0000000000000..d1335e797f70c --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_server_port.yaml @@ -0,0 +1,17 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + accepts_restart_command: true + +storage: + directory: '{{.storage_dir}}' + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} + opamp_server_port: {{ .supervisor_opamp_server_port }}