Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor]: Don't fail to start if the OpAMP server is unavailable #34159

Merged
13 changes: 13 additions & 0 deletions .chloggen/fix_supervisor-dont-require-server-connection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Start even if the OpAMP server cannot be contacted, and continually retry connecting.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33408, 33799]
198 changes: 197 additions & 1 deletion cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
Expand All @@ -36,6 +40,7 @@ import (
"github.com/stretchr/testify/require"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
Expand Down Expand Up @@ -74,10 +79,17 @@ type testingOpAMPServer struct {
addr string
supervisorConnected chan bool
sendToSupervisor func(*protobufs.ServerToAgent)
start func()
shutdown func()
}

func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
s := newUnstartedOpAMPServer(t, connectingCallback, callbacks)
s.start()
return s
}

func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
var agentConn atomic.Value
var isAgentConnected atomic.Bool
var didShutdown atomic.Bool
Expand Down Expand Up @@ -108,7 +120,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
require.NoError(t, err)
mux := http.NewServeMux()
mux.HandleFunc("/v1/opamp", handler)
httpSrv := httptest.NewServer(mux)
httpSrv := httptest.NewUnstartedServer(mux)

shutdown := func() {
if !didShutdown.Load() {
Expand All @@ -135,6 +147,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
addr: httpSrv.Listener.Addr().String(),
supervisorConnected: connectedChan,
sendToSupervisor: send,
start: httpSrv.Start,
shutdown: shutdown,
}
}
Expand Down Expand Up @@ -238,6 +251,148 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")
}

func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) {
storageDir := t.TempDir()
remoteConfigFilePath := filepath.Join(storageDir, "last_recv_remote_config.dat")

cfg, hash, healthcheckPort := createHealthCheckCollectorConf(t)
remoteConfigProto := &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
}
marshalledRemoteConfig, err := proto.Marshal(remoteConfigProto)
require.NoError(t, err)

require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
defer server.shutdown()

s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})
defer s.Shutdown()

// Verify the collector runs eventually by pinging the healthcheck extension
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d", healthcheckPort))
if err != nil {
t.Logf("Failed healthcheck: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)

// Start the server and wait for the supervisor to connect
server.start()

// Verify supervisor connects to server
waitForSupervisorConnection(server.supervisorConnected, true)

require.True(t, connected.Load(), "Supervisor failed to connect")
}

func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)

configuredChan := make(chan struct{})
connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
OnMessageFunc: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
if bytes.Equal(lastCfgHash, hash) {
close(configuredChan)
}

return &protobufs.ServerToAgent{}
},
})
defer server.shutdown()

// The supervisor is started without a running OpAMP server.
// The supervisor should start successfully, even if the OpAMP server is stopped.
s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
})
defer s.Shutdown()

// Verify the collector is running by checking the metrics endpoint
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get("http://localhost:8888/metrics")
if err != nil {
t.Logf("Failed check for prometheus metrics: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)

// Start the server and wait for the supervisor to connect
server.start()

// Verify supervisor connects to server
waitForSupervisorConnection(server.supervisorConnected, true)

require.True(t, connected.Load(), "Supervisor failed to connect")

// Verify that the collector can run a new config sent to it
server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

select {
case <-configuredChan:
case <-time.After(2 * time.Second):
require.FailNow(t, "timed out waiting for collector to reconfigure")
}

sampleLog := `{"body":"hello, world"}`
n, err := inputFile.WriteString(sampleLog + "\n")
require.NotZero(t, n, "Could not write to input file")
require.NoError(t, err)

require.Eventually(t, func() bool {
logRecord := make([]byte, 1024)

n, err = outputFile.Read(logRecord)
if !errors.Is(err, io.EOF) {
require.NoError(t, err)
}

return n != 0
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")

}

func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
var healthReport atomic.Value
var agentConfig atomic.Value
Expand Down Expand Up @@ -639,6 +794,29 @@ func createBadCollectorConf(t *testing.T) (*bytes.Buffer, []byte) {
return bytes.NewBuffer(colCfg), h.Sum(nil)
}

func createHealthCheckCollectorConf(t *testing.T) (cfg *bytes.Buffer, hash []byte, remotePort int) {
colCfgTpl, err := os.ReadFile(path.Join("testdata", "collector", "healthcheck_config.yaml"))
require.NoError(t, err)

templ, err := template.New("").Parse(string(colCfgTpl))
require.NoError(t, err)

port, err := findRandomPort()

var confmapBuf bytes.Buffer
err = templ.Execute(
&confmapBuf,
map[string]string{
"HealthCheckEndpoint": fmt.Sprintf("localhost:%d", port),
},
)
require.NoError(t, err)

h := sha256.Sum256(confmapBuf.Bytes())

return &confmapBuf, h[:], port
}

// Wait for the Supervisor to connect to or disconnect from the OpAMP server
func waitForSupervisorConnection(connection chan bool, connected bool) {
select {
Expand Down Expand Up @@ -1012,3 +1190,21 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {

require.Equal(t, newID, uuid.UUID(newRecievedAgentID))
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port

err = l.Close()

if err != nil {
return 0, err
}

return port, nil
}
7 changes: 7 additions & 0 deletions cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ agent:

```

### Operation When OpAMP Server is Unavailable

When the supervisor cannot connect to the OpAMP server, the collector will
be run with the last known configuration, or with a "noop" configuration
if no previous configuration is persisted. The supervisor will continually
attempt to reconnect to the OpAMP server with exponential backoff.

### Executing Collector

The Supervisor starts and stops the Collector process as necessary. When
Expand Down
Loading