From e2b2b87cad9383d3473b72cd5d80b76ebcdcdaf8 Mon Sep 17 00:00:00 2001 From: Ryan Min Date: Wed, 18 Dec 2024 12:44:40 -0500 Subject: [PATCH] add integration tests --- receiver/dockerstatsreceiver/README.md | 1 + .../dockerstatsreceiver/integration_test.go | 188 ++++++++++++++++++ receiver/dockerstatsreceiver/logs_receiver.go | 5 + 3 files changed, 194 insertions(+) diff --git a/receiver/dockerstatsreceiver/README.md b/receiver/dockerstatsreceiver/README.md index 5437a9a9702f..a07973fc98c7 100644 --- a/receiver/dockerstatsreceiver/README.md +++ b/receiver/dockerstatsreceiver/README.md @@ -40,6 +40,7 @@ The following settings are for both logs and metrics receiver: `!my*container` will exclude all containers whose image name doesn't match the blob `my*container`. - `timeout` (default = `5s`): The request timeout for any docker daemon query. - `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/). +Note: If different settings are needed for logs/metrics receivers, a new receiver config should be created entirely. Only for metrics receiver: - `collection_interval` (default = `10s`): The interval at which to gather container stats. diff --git a/receiver/dockerstatsreceiver/integration_test.go b/receiver/dockerstatsreceiver/integration_test.go index 571a2cb6e07e..789f1f242851 100644 --- a/receiver/dockerstatsreceiver/integration_test.go +++ b/receiver/dockerstatsreceiver/integration_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" rcvr "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" @@ -57,6 +58,23 @@ func createNginxContainer(ctx context.Context, t *testing.T) testcontainers.Cont return container } +func createRedisContainer(ctx context.Context, t *testing.T) testcontainers.Container { + req := testcontainers.ContainerRequest{ + Image: "docker.io/library/redis:latest", + ExposedPorts: []string{"6379/tcp"}, + WaitingFor: wait.ForExposedPort(), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err) + require.NotNil(t, container) + + return container +} + func hasResourceScopeMetrics(containerID string, metrics []pmetric.Metrics) bool { for _, m := range metrics { for i := 0; i < m.ResourceMetrics().Len(); i++ { @@ -155,6 +173,176 @@ func TestExcludedImageProducesNoMetricsIntegration(t *testing.T) { assert.NoError(t, recv.Shutdown(ctx)) } +func hasDockerEvents(logs []plog.Logs, containerID string, expectedEventNames []string) bool { + seen := make(map[string]bool) + for _, l := range logs { + for i := 0; i < l.ResourceLogs().Len(); i++ { + rl := l.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + for k := 0; k < sl.LogRecords().Len(); k++ { + record := sl.LogRecords().At(k) + attrs := record.Attributes() + if id, ok := attrs.Get("event.id"); ok && id.AsString() == containerID { + if name, ok := attrs.Get("event.name"); ok { + seen[name.AsString()] = true + } + } + } + } + } + } + + for _, expected := range expectedEventNames { + if !seen[expected] { + return false + } + } + return true +} + +func TestContainerLifecycleEventsIntegration(t *testing.T) { + t.Parallel() + params, ctx, cancel := paramsAndContext(t) + defer cancel() + + consumer := new(consumertest.LogsSink) + f, config := factory() + recv, err := f.CreateLogs(ctx, params, config, consumer) + + require.NoError(t, err, "failed creating logs receiver") + require.NoError(t, recv.Start(ctx, &nopHost{ + reportFunc: func(event *componentstatus.Event) { + require.NoError(t, event.Err()) + }, + })) + + // no events should be received before container starts + assert.Never(t, func() bool { + return len(consumer.AllLogs()) > 0 + }, 5*time.Second, 1*time.Second, "received unexpected events") + + nginxContainer := createNginxContainer(ctx, t) + nginxID := nginxContainer.GetContainerID() + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), nginxID, []string{ + "docker.container.create", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container create/start events") + + // Start second container (redis) and verify we get events from both + redisContainer := createRedisContainer(ctx, t) + redisID := redisContainer.GetContainerID() + + // Reset consumer to only check new events + consumer.Reset() + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + "docker.container.create", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive redis container events") + + consumer.Reset() + require.NoError(t, nginxContainer.Terminate(ctx)) + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + "docker.container.die", + "docker.container.stop", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container stop/die events") + + require.NoError(t, redisContainer.Terminate(ctx)) + assert.NoError(t, recv.Shutdown(ctx)) +} + +func TestFilteredContainerEventsIntegration(t *testing.T) { + t.Parallel() + params, ctx, cancel := paramsAndContext(t) + defer cancel() + + f, config := factory() + // Only receive events from redis containers + config.Logs.Filters = map[string][]string{ + "image": {"*redis*"}, + } + + consumer := new(consumertest.LogsSink) + recv, err := f.CreateLogs(ctx, params, config, consumer) + require.NoError(t, err, "failed creating logs receiver") + require.NoError(t, recv.Start(ctx, &nopHost{ + reportFunc: func(event *componentstatus.Event) { + require.NoError(t, event.Err()) + }, + })) + + nginxContainer := createNginxContainer(ctx, t) + assert.Never(t, func() bool { + return len(consumer.AllLogs()) > 0 + }, 5*time.Second, 1*time.Second, "received events for excluded container") + + redisContainer := createRedisContainer(ctx, t) + redisID := redisContainer.GetContainerID() + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + "docker.container.create", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive redis container events") + + require.NoError(t, nginxContainer.Terminate(ctx)) + require.NoError(t, redisContainer.Terminate(ctx)) + assert.NoError(t, recv.Shutdown(ctx)) +} + +func TestContainerRestartEventsIntegration(t *testing.T) { + t.Parallel() + params, ctx, cancel := paramsAndContext(t) + defer cancel() + + consumer := new(consumertest.LogsSink) + f, config := factory() + recv, err := f.CreateLogs(ctx, params, config, consumer) + + require.NoError(t, err, "failed creating logs receiver") + require.NoError(t, recv.Start(ctx, &nopHost{ + reportFunc: func(event *componentstatus.Event) { + require.NoError(t, event.Err()) + }, + })) + + nginxContainer := createNginxContainer(ctx, t) + nginxID := nginxContainer.GetContainerID() + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), nginxID, []string{ + "docker.container.create", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container start events") + + consumer.Reset() + stopDuration := 2 * time.Second + require.NoError(t, nginxContainer.Stop(ctx, &stopDuration)) + require.NoError(t, nginxContainer.Start(ctx)) + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), nginxID, []string{ + "docker.container.die", + "docker.container.stop", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container restart events") + + require.NoError(t, nginxContainer.Terminate(ctx)) + assert.NoError(t, recv.Shutdown(ctx)) +} + var _ componentstatus.Reporter = (*nopHost)(nil) type nopHost struct { diff --git a/receiver/dockerstatsreceiver/logs_receiver.go b/receiver/dockerstatsreceiver/logs_receiver.go index ad1b451ed0a1..b5a8878ed9ab 100644 --- a/receiver/dockerstatsreceiver/logs_receiver.go +++ b/receiver/dockerstatsreceiver/logs_receiver.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "strings" + "sync" "time" "github.com/cenkalti/backoff/v4" @@ -81,6 +82,7 @@ type dockerEventPoller struct { logger *zap.Logger eventHandler func(context.Context, *events.Message) error backoff *backoff.ExponentialBackOff + sync.WaitGroup } func newDockerEventPoller( @@ -133,6 +135,8 @@ func (d *dockerEventPoller) Start(ctx context.Context) { } func (d *dockerEventPoller) processEvents(ctx context.Context, eventChan <-chan events.Message, errChan <-chan error) error { + d.Add(1) + defer d.Done() processedOnce := false for { select { @@ -197,5 +201,6 @@ func (r *logsReceiver) Shutdown(_ context.Context) error { if r.cancel != nil { r.cancel() } + r.eventPoller.Wait() return nil }