Skip to content

Commit

Permalink
[cmd/opampsupervisor]: Configurable Supervisor OpAmp server port (ope…
Browse files Browse the repository at this point in the history
…n-telemetry#36002)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Allows the Supervisor's OpAmp server port to be configurable. This is
useful for restricted environments deploying the supervisor.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes open-telemetry#36001 

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Updated tests and added an e2e test

<!--Describe the documentation added.-->
#### Documentation
Spec was updated accordingly. Also added documentation for previously
added `health_check_port` that wasn't present in spec.

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
dpaasman00 authored and ArthurSens committed Nov 4, 2024
1 parent a9fd27b commit 65992b7
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Makes the Supervisor's OpAmp server port configurable with 'agent::opamp_server_port'.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36001]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
65 changes: 65 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,71 @@ func TestSupervisorLogging(t *testing.T) {
require.NoError(t, logFile.Close())
}

func TestSupervisorOpAmpServerPort(t *testing.T) {
var agentConfig atomic.Value
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
agentConfig.Store(string(config.Body))
}
}

return &protobufs.ServerToAgent{}
},
})

supervisorOpAmpServerPort, err := findRandomPort()
require.NoError(t, err)

s := newSupervisor(t, "server_port", map[string]string{"url": server.addr, "supervisor_opamp_server_port": fmt.Sprintf("%d", supervisorOpAmpServerPort)})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

require.Eventually(t, func() bool {
cfg, ok := agentConfig.Load().(string)
if ok {
// The effective config may be structurally different compared to what was sent,
// and will also have some data redacted,
// so just check that it includes the filelog receiver
return strings.Contains(cfg, "filelog")
}

return false
}, 5*time.Second, 500*time.Millisecond, "Collector was not started with remote config")

n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n")
require.NotZero(t, n, "Could not write to input file")
require.NoError(t, err)

require.Eventually(t, func() bool {
logRecord := make([]byte, 1024)
n, _ := outputFile.Read(logRecord)

return n != 0
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

Expand Down
9 changes: 8 additions & 1 deletion cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,14 @@ agent:
client.id: "01HWWSK84BMT7J45663MBJMTPJ"
non_identifying_attributes:
custom.attribute: "custom-value"


# The port the Collector's health check extension will be configured to use
health_check_port:

# The port the Supervisor will start its OpAmp server on and the Collector's
# OpAmp extension will connect to
opamp_server_port:

```

### Operation When OpAMP Server is Unavailable
Expand Down
5 changes: 5 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type Agent struct {
Description AgentDescription `mapstructure:"description"`
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
HealthCheckPort int `mapstructure:"health_check_port"`
OpAMPServerPort int `mapstructure:"opamp_server_port"`
PassthroughLogs bool `mapstructure:"passthrough_logs"`
}

Expand All @@ -171,6 +172,10 @@ func (a Agent) Validate() error {
return errors.New("agent::health_check_port must be a valid port number")
}

if a.OpAMPServerPort < 0 || a.OpAMPServerPort > 65535 {
return errors.New("agent::opamp_server_port must be a valid port number")
}

if a.Executable == "" {
return errors.New("agent::executable must be specified")
}
Expand Down
53 changes: 50 additions & 3 deletions cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestValidate(t *testing.T) {
expectedError: "agent::orphan_detection_interval must be positive",
},
{
name: "Invalid port number",
name: "Invalid health check port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Expand All @@ -254,7 +254,7 @@ func TestValidate(t *testing.T) {
expectedError: "agent::health_check_port must be a valid port number",
},
{
name: "Zero value port number",
name: "Zero value health check port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Expand All @@ -280,7 +280,7 @@ func TestValidate(t *testing.T) {
},
},
{
name: "Normal port number",
name: "Normal health check port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Expand Down Expand Up @@ -331,6 +331,53 @@ func TestValidate(t *testing.T) {
},
expectedError: "agent::bootstrap_timeout must be positive",
},
{
name: "Invalid opamp server port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Headers: http.Header{
"Header1": []string{"HeaderValue"},
},
},
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
OpAMPServerPort: 65536,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
},
Storage: Storage{
Directory: "/etc/opamp-supervisor/storage",
},
},
expectedError: "agent::opamp_server_port must be a valid port number",
},
{
name: "Zero value opamp server port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Headers: http.Header{
"Header1": []string{"HeaderValue"},
},
},
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
OpAMPServerPort: 0,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
},
Storage: Storage{
Directory: "/etc/opamp-supervisor/storage",
},
},
},
}

// create some fake files for validating agent config
Expand Down
11 changes: 9 additions & 2 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (s *Supervisor) createTemplates() error {
// shuts down the Collector. This only needs to happen
// once per Collector binary.
func (s *Supervisor) getBootstrapInfo() (err error) {
s.opampServerPort, err = s.findRandomPort()
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
if err != nil {
return err
}
Expand Down Expand Up @@ -457,7 +457,7 @@ func (s *Supervisor) startOpAMPServer() error {
s.opampServer = server.New(newLoggerFromZap(s.logger))

var err error
s.opampServerPort, err = s.findRandomPort()
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
if err != nil {
return err
}
Expand Down Expand Up @@ -1345,6 +1345,13 @@ func (s *Supervisor) agentConfigFilePath() string {
return filepath.Join(s.config.Storage.Directory, agentConfigFileName)
}

func (s *Supervisor) getSupervisorOpAMPServerPort() (int, error) {
if s.config.Agent.OpAMPServerPort != 0 {
return s.config.Agent.OpAMPServerPort, nil
}
return s.findRandomPort()
}

func (s *Supervisor) findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
server:
endpoint: ws://{{.url}}/v1/opamp

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_restart_command: true

storage:
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
opamp_server_port: {{ .supervisor_opamp_server_port }}

0 comments on commit 65992b7

Please sign in to comment.