Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor] Handle OpAMP connection settings #30237

Merged
merged 24 commits into from
Apr 9, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rely on OnConnectFunc for conn status
  • Loading branch information
srikanthccv committed Apr 3, 2024
commit aab743295d3d967c27bbb57b169b94e91fd408cc
26 changes: 23 additions & 3 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type Supervisor struct {

agentHasStarted bool
agentStartHealthCheckAttempts int

connectedToOpAMPServer chan struct{}
}

func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
Expand All @@ -116,6 +118,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
effectiveConfigFilePath: "effective.yaml",
agentConfigOwnMetricsSection: &atomic.Value{},
effectiveConfig: &atomic.Value{},
connectedToOpAMPServer: make(chan struct{}),
}

if err := s.createTemplates(); err != nil {
Expand Down Expand Up @@ -154,6 +157,10 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
return nil, fmt.Errorf("cannot start OpAMP client: %w", err)
}

if err := s.waitForOpAMPConnection(); err != nil {
return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", err)
}

s.commander, err = commander.NewCommander(
s.logger,
s.config.Agent,
Expand Down Expand Up @@ -367,6 +374,7 @@ func (s *Supervisor) startOpAMP() error {
InstanceUid: s.instanceID.String(),
Callbacks: types.CallbacksStruct{
OnConnectFunc: func(_ context.Context) {
s.connectedToOpAMPServer <- struct{}{}
s.logger.Debug("Connected to the server.")
},
OnConnectFailedFunc: func(_ context.Context, err error) {
Expand All @@ -375,8 +383,11 @@ func (s *Supervisor) startOpAMP() error {
OnErrorFunc: func(_ context.Context, err *protobufs.ServerErrorResponse) {
s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage))
},
OnMessageFunc: s.onMessage,
OnOpampConnectionSettingsFunc: s.onOpampConnectionSettings,
OnMessageFunc: s.onMessage,
OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error {
go s.onOpampConnectionSettings(ctx, settings)
return nil
},
OnCommandFunc: func(_ context.Context, command *protobufs.ServerToAgentCommand) error {
cmdType := command.GetType()
if *cmdType.Enum() == protobufs.CommandType_CommandType_Restart {
Expand Down Expand Up @@ -485,8 +496,17 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot
return err
}
}
return s.waitForOpAMPConnection()
}

return nil
func (s *Supervisor) waitForOpAMPConnection() error {
// wait for the OpAMP client to connect to the server or timeout
select {
case <-s.connectedToOpAMPServer:
return nil
case <-time.After(10 * time.Second):
return errors.New("timed out waiting for the server to connect")
}
}

// TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073
Expand Down
Loading