Skip to content

Commit

Permalink
add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
spiffyy99 committed Dec 18, 2024
1 parent 1e9d043 commit 660b0b5
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 3 deletions.
1 change: 1 addition & 0 deletions receiver/dockerstatsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The following settings are for both logs and metrics receiver:
`!my*container` will exclude all containers whose image name doesn't match the blob `my*container`.
- `timeout` (default = `5s`): The request timeout for any docker daemon query.
- `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/).
Note: If different settings are needed for logs/metrics receivers, a new receiver config should be created entirely.

Only for metrics receiver:
- `collection_interval` (default = `10s`): The interval at which to gather container stats.
Expand Down
188 changes: 188 additions & 0 deletions receiver/dockerstatsreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
rcvr "go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand Down Expand Up @@ -57,6 +58,23 @@ func createNginxContainer(ctx context.Context, t *testing.T) testcontainers.Cont
return container
}

func createRedisContainer(ctx context.Context, t *testing.T) testcontainers.Container {
req := testcontainers.ContainerRequest{
Image: "docker.io/library/redis:latest",
ExposedPorts: []string{"6379/tcp"},
WaitingFor: wait.ForExposedPort(),
}

container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err)
require.NotNil(t, container)

return container
}

func hasResourceScopeMetrics(containerID string, metrics []pmetric.Metrics) bool {
for _, m := range metrics {
for i := 0; i < m.ResourceMetrics().Len(); i++ {
Expand Down Expand Up @@ -155,6 +173,176 @@ func TestExcludedImageProducesNoMetricsIntegration(t *testing.T) {
assert.NoError(t, recv.Shutdown(ctx))
}

func hasDockerEvents(logs []plog.Logs, containerID string, expectedEventNames []string) bool {
seen := make(map[string]bool)
for _, l := range logs {
for i := 0; i < l.ResourceLogs().Len(); i++ {
rl := l.ResourceLogs().At(i)
for j := 0; j < rl.ScopeLogs().Len(); j++ {
sl := rl.ScopeLogs().At(j)
for k := 0; k < sl.LogRecords().Len(); k++ {
record := sl.LogRecords().At(k)
attrs := record.Attributes()
if id, ok := attrs.Get("event.id"); ok && id.AsString() == containerID {
if name, ok := attrs.Get("event.name"); ok {
seen[name.AsString()] = true
}
}
}
}
}
}

for _, expected := range expectedEventNames {
if !seen[expected] {
return false
}
}
return true
}

func TestContainerLifecycleEventsIntegration(t *testing.T) {
t.Parallel()
params, ctx, cancel := paramsAndContext(t)
defer cancel()

consumer := new(consumertest.LogsSink)
f, config := factory()
recv, err := f.CreateLogs(ctx, params, config, consumer)

require.NoError(t, err, "failed creating logs receiver")
require.NoError(t, recv.Start(ctx, &nopHost{
reportFunc: func(event *componentstatus.Event) {
require.NoError(t, event.Err())
},
}))

// no events should be received before container starts
assert.Never(t, func() bool {
return len(consumer.AllLogs()) > 0
}, 5*time.Second, 1*time.Second, "received unexpected events")

nginxContainer := createNginxContainer(ctx, t)
nginxID := nginxContainer.GetContainerID()

assert.Eventuallyf(t, func() bool {
return hasDockerEvents(consumer.AllLogs(), nginxID, []string{
"docker.container.create",
"docker.container.start",
})
}, 5*time.Second, 1*time.Second, "failed to receive container create/start events")

// Start second container (redis) and verify we get events from both
redisContainer := createRedisContainer(ctx, t)
redisID := redisContainer.GetContainerID()

// Reset consumer to only check new events
consumer.Reset()

assert.Eventuallyf(t, func() bool {
return hasDockerEvents(consumer.AllLogs(), redisID, []string{
"docker.container.create",
"docker.container.start",
})
}, 5*time.Second, 1*time.Second, "failed to receive redis container events")

consumer.Reset()
require.NoError(t, nginxContainer.Terminate(ctx))

assert.Eventuallyf(t, func() bool {
return hasDockerEvents(consumer.AllLogs(), redisID, []string{
"docker.container.die",
"docker.container.stop",
})
}, 5*time.Second, 1*time.Second, "failed to receive container stop/die events")

require.NoError(t, redisContainer.Terminate(ctx))
assert.NoError(t, recv.Shutdown(ctx))
}

func TestFilteredContainerEventsIntegration(t *testing.T) {
t.Parallel()
params, ctx, cancel := paramsAndContext(t)
defer cancel()

f, config := factory()
// Only receive events from redis containers
config.Logs.Filters = map[string][]string{
"image": {"*redis*"},
}

consumer := new(consumertest.LogsSink)
recv, err := f.CreateLogs(ctx, params, config, consumer)
require.NoError(t, err, "failed creating logs receiver")
require.NoError(t, recv.Start(ctx, &nopHost{
reportFunc: func(event *componentstatus.Event) {
require.NoError(t, event.Err())
},
}))

nginxContainer := createNginxContainer(ctx, t)
assert.Never(t, func() bool {
return len(consumer.AllLogs()) > 0
}, 5*time.Second, 1*time.Second, "received events for excluded container")

redisContainer := createRedisContainer(ctx, t)
redisID := redisContainer.GetContainerID()

assert.Eventuallyf(t, func() bool {
return hasDockerEvents(consumer.AllLogs(), redisID, []string{
"docker.container.create",
"docker.container.start",
})
}, 5*time.Second, 1*time.Second, "failed to receive redis container events")

require.NoError(t, nginxContainer.Terminate(ctx))
require.NoError(t, redisContainer.Terminate(ctx))
assert.NoError(t, recv.Shutdown(ctx))
}

func TestContainerRestartEventsIntegration(t *testing.T) {
t.Parallel()
params, ctx, cancel := paramsAndContext(t)
defer cancel()

consumer := new(consumertest.LogsSink)
f, config := factory()
recv, err := f.CreateLogs(ctx, params, config, consumer)

require.NoError(t, err, "failed creating logs receiver")
require.NoError(t, recv.Start(ctx, &nopHost{
reportFunc: func(event *componentstatus.Event) {
require.NoError(t, event.Err())
},
}))

nginxContainer := createNginxContainer(ctx, t)
nginxID := nginxContainer.GetContainerID()

assert.Eventuallyf(t, func() bool {
return hasDockerEvents(consumer.AllLogs(), nginxID, []string{
"docker.container.create",
"docker.container.start",
})
}, 5*time.Second, 1*time.Second, "failed to receive container start events")

consumer.Reset()
stopDuration := 2 * time.Second
require.NoError(t, nginxContainer.Stop(ctx, &stopDuration))
require.NoError(t, nginxContainer.Start(ctx))

assert.Eventuallyf(t, func() bool {
return hasDockerEvents(consumer.AllLogs(), nginxID, []string{
"docker.container.die",
"docker.container.stop",
"docker.container.start",
})
}, 5*time.Second, 1*time.Second, "failed to receive container restart events")

require.NoError(t, nginxContainer.Terminate(ctx))
assert.NoError(t, recv.Shutdown(ctx))
}

var _ componentstatus.Reporter = (*nopHost)(nil)

type nopHost struct {
Expand Down
10 changes: 9 additions & 1 deletion receiver/dockerstatsreceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -81,13 +82,15 @@ type dockerEventPoller struct {
logger *zap.Logger
eventHandler func(context.Context, *events.Message) error
backoff *backoff.ExponentialBackOff
sync.WaitGroup
}

func newDockerEventPoller(
config *Config,
client *docker.Client,
logger *zap.Logger,
handler func(context.Context, *events.Message) error) *dockerEventPoller {
handler func(context.Context, *events.Message) error,
) *dockerEventPoller {
return &dockerEventPoller{
config: config,
client: client,
Expand Down Expand Up @@ -133,6 +136,8 @@ func (d *dockerEventPoller) Start(ctx context.Context) {
}

func (d *dockerEventPoller) processEvents(ctx context.Context, eventChan <-chan events.Message, errChan <-chan error) error {
d.Add(1)
defer d.Done()
processedOnce := false
for {
select {
Expand Down Expand Up @@ -197,5 +202,8 @@ func (r *logsReceiver) Shutdown(_ context.Context) error {
if r.cancel != nil {
r.cancel()
}
if r.eventPoller != nil {
r.eventPoller.Wait()
}
return nil
}
6 changes: 4 additions & 2 deletions receiver/dockerstatsreceiver/logs_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ func TestDockerEventPolling(t *testing.T) {
},
}

mockDockerEngine, err := createEventsMockServer(t, []string{filepath.Join(mockFolder, "single_container", "events.json"),
filepath.Join(mockFolder, "single_container", "events2.json")})
mockDockerEngine, err := createEventsMockServer(t, []string{
filepath.Join(mockFolder, "single_container", "events.json"),
filepath.Join(mockFolder, "single_container", "events2.json"),
})
require.NoError(t, err)
defer mockDockerEngine.Close()
mockLogsConsumer := &consumertest.LogsSink{}
Expand Down

0 comments on commit 660b0b5

Please sign in to comment.