From 08d26b7c078703e56c0882f417026307458df655 Mon Sep 17 00:00:00 2001 From: Ryan Min Date: Thu, 14 Nov 2024 07:39:48 -0500 Subject: [PATCH 1/5] [dockerstatsreceiver] add new log receiver to ingest docker events --- .chloggen/dockerstats_event_log_receiver.yaml | 27 ++ receiver/dockerstatsreceiver/README.md | 52 ++-- receiver/dockerstatsreceiver/config.go | 79 ++++++ receiver/dockerstatsreceiver/config_test.go | 152 ++++++++++-- receiver/dockerstatsreceiver/factory.go | 14 ++ receiver/dockerstatsreceiver/factory_test.go | 4 + .../generated_component_test.go | 7 + receiver/dockerstatsreceiver/go.mod | 2 +- .../internal/metadata/generated_status.go | 1 + receiver/dockerstatsreceiver/logs_receiver.go | 201 +++++++++++++++ .../dockerstatsreceiver/logs_receiver_test.go | 234 ++++++++++++++++++ receiver/dockerstatsreceiver/metadata.yaml | 1 + .../{receiver.go => metrics_receiver.go} | 0 ...eiver_test.go => metrics_receiver_test.go} | 6 +- .../dockerstatsreceiver/testdata/config.yaml | 8 + .../mock/single_container/events.json | 2 + .../mock/single_container/events2.json | 5 + 17 files changed, 758 insertions(+), 37 deletions(-) create mode 100644 .chloggen/dockerstats_event_log_receiver.yaml create mode 100644 receiver/dockerstatsreceiver/logs_receiver.go create mode 100644 receiver/dockerstatsreceiver/logs_receiver_test.go rename receiver/dockerstatsreceiver/{receiver.go => metrics_receiver.go} (100%) rename receiver/dockerstatsreceiver/{receiver_test.go => metrics_receiver_test.go} (99%) create mode 100644 receiver/dockerstatsreceiver/testdata/mock/single_container/events.json create mode 100644 receiver/dockerstatsreceiver/testdata/mock/single_container/events2.json diff --git a/.chloggen/dockerstats_event_log_receiver.yaml b/.chloggen/dockerstats_event_log_receiver.yaml new file mode 100644 index 000000000000..da64d8014832 --- /dev/null +++ b/.chloggen/dockerstats_event_log_receiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: dockerstatsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add new log receiver to ingest docker events. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29096] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/dockerstatsreceiver/README.md b/receiver/dockerstatsreceiver/README.md index ccfabc263b84..6d7241fc4a7e 100644 --- a/receiver/dockerstatsreceiver/README.md +++ b/receiver/dockerstatsreceiver/README.md @@ -3,19 +3,23 @@ | Status | | | ------------- |-----------| -| Stability | [alpha]: metrics | +| Stability | [development]: logs | +| | [alpha]: metrics | | Unsupported Platforms | darwin, windows | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fdockerstats%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fdockerstats) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fdockerstats%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fdockerstats) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@jamesmoessis](https://www.github.com/jamesmoessis) | | Emeritus | [@rmfitzpatrick](https://www.github.com/rmfitzpatrick) | +[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development [alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -The Docker Stats receiver queries the local Docker daemon's container stats API for -all desired running containers on a configured interval. These stats are for container +The Docker Stats receiver queries the local Docker daemon: +- for the logs receiver, queries the events API for Docker events and streams them into log records. These events +can be any in the [docker events](https://docs.docker.com/reference/cli/docker/system/events/) command. +- for the metrics receiver, queries the container stats API for all desired running containers on a configured interval. These stats are for container resource usage of cpu, memory, network, and the [blkio controller](https://www.kernel.org/doc/Documentation/cgroup-v1/blkio-controller.txt). @@ -23,27 +27,39 @@ resource usage of cpu, memory, network, and the ## Configuration -The following settings are optional: +The following settings are for both logs and metrics receier: - `endpoint` (default = `unix:///var/run/docker.sock`): Address to reach the desired Docker daemon. +- `excluded_images` (no default, all running containers monitored): A list of strings, + [regexes](https://golang.org/pkg/regexp/), or [globs](https://github.com/gobwas/glob) whose referent container image + names will not be among the queried containers. `!`-prefixed negations are possible for all item types to signify that + only unmatched container image names should be excluded. + - Regexes must be placed between `/` characters: `/my?egex/`. Negations are to be outside the forward slashes: + `!/my?egex/` will exclude all containers whose name doesn't match the compiled regex `my?egex`. + - Globs are non-regex items (e.g. `/items/`) containing any of the following: `*[]{}?`. Negations are supported: + `!my*container` will exclude all containers whose image name doesn't match the blob `my*container`. +- `timeout` (default = `5s`): The request timeout for any docker daemon query. +- `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/). + +Only for metrics receiver: - `collection_interval` (default = `10s`): The interval at which to gather container stats. - `initial_delay` (default = `1s`): defines how long this receiver waits before starting. - `container_labels_to_metric_labels` (no default): A map of Docker container label names whose label values to use as the specified metric label key. - `env_vars_to_metric_labels` (no default): A map of Docker container environment variables whose values to use as the specified metric label key. -- `excluded_images` (no default, all running containers monitored): A list of strings, -[regexes](https://golang.org/pkg/regexp/), or [globs](https://github.com/gobwas/glob) whose referent container image -names will not be among the queried containers. `!`-prefixed negations are possible for all item types to signify that -only unmatched container image names should be excluded. - - Regexes must be placed between `/` characters: `/my?egex/`. Negations are to be outside the forward slashes: - `!/my?egex/` will exclude all containers whose name doesn't match the compiled regex `my?egex`. - - Globs are non-regex items (e.g. `/items/`) containing any of the following: `*[]{}?`. Negations are supported: - `!my*container` will exclude all containers whose image name doesn't match the blob `my*container`. -- `timeout` (default = `5s`): The request timeout for any docker daemon query. -- `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/). - `metrics` (defaults at [./documentation.md](./documentation.md)): Enables/disables individual metrics. See [./documentation.md](./documentation.md) for full detail. +Only for logs receiver: +- `min_docker_retry_wait` (default = `1s`): The docker daemon may disconnect from the receiver. This configures the starting +duration to wait before attempting a reconnect (with exponential backoff). +- `max_docker_retry_wait` (default = `10s`): This configures the maximum + duration to wait before attempting a reconnect (with exponential backoff). +- `logs`: Configuration for which docker events to emit as logs. matches the [go docker api](https://github.com/moby/moby/blob/master/api/types/events/events.go#L131). + - `since`: The earliest time docker events should be emitted at. Accepts Unix timestamps or RFC3339 formatted timestamps (e.g., "2024-01-02T15:04:05Z"). + - `until`: The latest time docker events should be emitted at. Same format as above. + - `filters`: a map of which docker events to emit based on properties. See [docker events](https://docs.docker.com/reference/cli/docker/system/events/) for more details. + Example: ```yaml @@ -68,6 +84,14 @@ receivers: enabled: true container.network.io.usage.tx_dropped: enabled: false + min_docker_retry_wait: 5s + max_docker_retry_wait: 30s + logs: + filters: + type: ["container", "image"] + event: ["start", "stop", "die"] + since: "2024-01-01T00:00:00Z" + until: "2024-01-02T00:00:00Z" ``` The full list of settings exposed for this receiver are documented [here](./config.go) diff --git a/receiver/dockerstatsreceiver/config.go b/receiver/dockerstatsreceiver/config.go index 750560dbcf98..6d0f7551fda7 100644 --- a/receiver/dockerstatsreceiver/config.go +++ b/receiver/dockerstatsreceiver/config.go @@ -4,6 +4,10 @@ package dockerstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver" import ( + "fmt" + "strconv" + "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/receiver/scraperhelper" @@ -14,9 +18,21 @@ import ( var _ component.Config = (*Config)(nil) +// EventsConfig contains configuration for Docker events collection +type EventsConfig struct { + // Filters allows filtering which Docker events to collect + Filters map[string][]string `mapstructure:"filters"` + // Since shows events created since this timestamp + Since string `mapstructure:"since"` + // Until shows events created until this timestamp + Until string `mapstructure:"until"` +} + type Config struct { docker.Config `mapstructure:",squash"` + // Metrics-specific settings + scraperhelper.ControllerConfig `mapstructure:",squash"` // A mapping of container label names to MetricDescriptor label keys. @@ -36,12 +52,75 @@ type Config struct { // MetricsBuilderConfig config. Enable or disable stats by name. metadata.MetricsBuilderConfig `mapstructure:",squash"` + + // Logs-specific settings + // MinDockerRetryWait is the minimum time to wait before retrying to connect to the Docker daemon + MinDockerRetryWait time.Duration `mapstructure:"min_docker_retry_wait"` + // MaxDockerRetryWait is the maximum time to wait before retrying to connect to the Docker daemon + MaxDockerRetryWait time.Duration `mapstructure:"max_docker_retry_wait"` + + // Logs configuration (Docker events) + Logs EventsConfig `mapstructure:"logs"` +} + +func parseTimestamp(ts string) (time.Time, error) { + // Try Unix timestamp first + if i, err := strconv.ParseInt(ts, 10, 64); err == nil { + return time.Unix(i, 0), nil + } + + // Try RFC3339 + return time.Parse(time.RFC3339, ts) } func (config Config) Validate() error { if err := docker.VersionIsValidAndGTE(config.DockerAPIVersion, minimumRequiredDockerAPIVersion); err != nil { return err } + + // Validate logs-specific config + if config.MinDockerRetryWait <= 0 { + return fmt.Errorf("min_docker_retry_wait must be positive, got %v", config.MinDockerRetryWait) + } + if config.MaxDockerRetryWait <= 0 { + return fmt.Errorf("max_docker_retry_wait must be positive, got %v", config.MaxDockerRetryWait) + } + if config.MaxDockerRetryWait < config.MinDockerRetryWait { + return fmt.Errorf("max_docker_retry_wait must not be less than min_docker_retry_wait") + } + + now := time.Now() + var sinceTime time.Time + if config.Logs.Since != "" { + var err error + sinceTime, err = parseTimestamp(config.Logs.Since) + if err != nil { + return fmt.Errorf("logs.since must be a Unix timestamp or RFC3339 time: %w", err) + } + if sinceTime.After(now) { + return fmt.Errorf("logs.since cannot be in the future") + } + } + + // Parse and validate until if set + var untilTime time.Time + if config.Logs.Until != "" { + var err error + untilTime, err = parseTimestamp(config.Logs.Until) + if err != nil { + return fmt.Errorf("logs.until must be a Unix timestamp or RFC3339 time: %w", err) + } + if untilTime.After(now) { + config.Logs.Until = "" // Clear future until time + } + } + + // If both are set, ensure since is not after until + if config.Logs.Since != "" && config.Logs.Until != "" { + if sinceTime.After(untilTime) { + return fmt.Errorf("logs.since must not be after logs.until") + } + } return nil } diff --git a/receiver/dockerstatsreceiver/config_test.go b/receiver/dockerstatsreceiver/config_test.go index 30775206b092..07f82f8d3b94 100644 --- a/receiver/dockerstatsreceiver/config_test.go +++ b/receiver/dockerstatsreceiver/config_test.go @@ -78,6 +78,16 @@ func TestLoadConfig(t *testing.T) { } return m }(), + MinDockerRetryWait: 1 * time.Second, + MaxDockerRetryWait: 30 * time.Second, + Logs: EventsConfig{ + Filters: map[string][]string{ + "type": {"container", "image"}, + "event": {"start", "stop", "die"}, + }, + Since: "2024-01-01T00:00:00Z", + Until: "2024-01-02T00:00:00Z", + }, }, }, } @@ -98,28 +108,134 @@ func TestLoadConfig(t *testing.T) { } func TestValidateErrors(t *testing.T) { - cfg := &Config{ControllerConfig: scraperhelper.NewDefaultControllerConfig(), Config: docker.Config{ - DockerAPIVersion: "1.25", - }} - assert.Equal(t, "endpoint must be specified", component.ValidateConfig(cfg).Error()) - - cfg = &Config{ - Config: docker.Config{ - DockerAPIVersion: "1.21", - Endpoint: "someEndpoint", + tests := []struct { + name string + cfg *Config + expectedErr string + }{ + { + name: "missing endpoint", + cfg: &Config{ + Config: docker.Config{ + DockerAPIVersion: "1.25", + }, + ControllerConfig: scraperhelper.NewDefaultControllerConfig(), + }, + expectedErr: "endpoint must be specified", + }, + { + name: "outdated api version", + cfg: &Config{ + Config: docker.Config{ + DockerAPIVersion: "1.21", + Endpoint: "someEndpoint", + }, + ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second}, + }, + expectedErr: `"api_version" 1.21 must be at least 1.25`, + }, + { + name: "missing collection interval", + cfg: &Config{ + Config: docker.Config{ + Endpoint: "someEndpoint", + DockerAPIVersion: "1.25", + }, + ControllerConfig: scraperhelper.ControllerConfig{}, + }, + expectedErr: `"collection_interval": requires positive value`, + }, + { + name: "negative min retry wait", + cfg: &Config{ + Config: docker.Config{ + Endpoint: "unix:///var/run/docker.sock", + DockerAPIVersion: "1.25", + }, + MinDockerRetryWait: -1 * time.Second, + MaxDockerRetryWait: 30 * time.Second, + }, + expectedErr: "min_docker_retry_wait must be positive, got -1s", + }, + { + name: "negative max retry wait", + cfg: &Config{ + Config: docker.Config{ + Endpoint: "unix:///var/run/docker.sock", + DockerAPIVersion: "1.25", + }, + MinDockerRetryWait: 1 * time.Second, + MaxDockerRetryWait: -1 * time.Second, + }, + expectedErr: "max_docker_retry_wait must be positive, got -1s", + }, + { + name: "max less than min", + cfg: &Config{ + Config: docker.Config{ + Endpoint: "unix:///var/run/docker.sock", + DockerAPIVersion: "1.25", + }, + MinDockerRetryWait: 30 * time.Second, + MaxDockerRetryWait: 1 * time.Second, + }, + expectedErr: "max_docker_retry_wait must not be less than min_docker_retry_wait", + }, + { + name: "invalid since timestamp", + cfg: &Config{ + Config: docker.Config{ + Endpoint: "unix:///var/run/docker.sock", + DockerAPIVersion: "1.25", + }, + MinDockerRetryWait: 1 * time.Second, + MaxDockerRetryWait: 30 * time.Second, + Logs: EventsConfig{ + Since: "not-a-timestamp", + }, + }, + expectedErr: "logs.since must be a Unix timestamp or RFC3339 time", + }, + { + name: "future since timestamp", + cfg: &Config{ + Config: docker.Config{ + Endpoint: "unix:///var/run/docker.sock", + DockerAPIVersion: "1.25", + }, + ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second}, + MinDockerRetryWait: 1 * time.Second, + MaxDockerRetryWait: 30 * time.Second, + Logs: EventsConfig{ + Since: time.Now().Add(24 * time.Hour).Format(time.RFC3339), + }, + }, + expectedErr: "logs.since cannot be in the future", + }, + { + name: "since after until", + cfg: &Config{ + Config: docker.Config{ + Endpoint: "unix:///var/run/docker.sock", + DockerAPIVersion: "1.25", + }, + MinDockerRetryWait: 1 * time.Second, + MaxDockerRetryWait: 30 * time.Second, + Logs: EventsConfig{ + Since: "2024-01-02T00:00:00Z", + Until: "2024-01-01T00:00:00Z", + }, + }, + expectedErr: "logs.since must not be after logs.until", }, - ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second}, } - assert.Equal(t, `"api_version" 1.21 must be at least 1.25`, component.ValidateConfig(cfg).Error()) - cfg = &Config{ - Config: docker.Config{ - Endpoint: "someEndpoint", - DockerAPIVersion: "1.25", - }, - ControllerConfig: scraperhelper.ControllerConfig{}, + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := component.ValidateConfig(tt.cfg) + assert.ErrorContains(t, err, tt.expectedErr) + }) } - assert.Equal(t, `"collection_interval": requires positive value`, component.ValidateConfig(cfg).Error()) } func TestApiVersionCustomError(t *testing.T) { diff --git a/receiver/dockerstatsreceiver/factory.go b/receiver/dockerstatsreceiver/factory.go index 56cf2cb1f0e5..5476e54d0fcb 100644 --- a/receiver/dockerstatsreceiver/factory.go +++ b/receiver/dockerstatsreceiver/factory.go @@ -21,6 +21,7 @@ func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, + receiver.WithLogs(createLogsReceiver, metadata.LogsStability), receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) } @@ -36,9 +37,22 @@ func createDefaultConfig() component.Config { Timeout: scs.Timeout, }, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + MinDockerRetryWait: time.Second, + MaxDockerRetryWait: 10 * time.Second, + Logs: EventsConfig{}, } } +func createLogsReceiver( + _ context.Context, + params receiver.Settings, + config component.Config, + consumer consumer.Logs, +) (receiver.Logs, error) { + dockerConfig := config.(*Config) + return newLogsReceiver(params, dockerConfig, consumer), nil +} + func createMetricsReceiver( _ context.Context, params receiver.Settings, diff --git a/receiver/dockerstatsreceiver/factory_test.go b/receiver/dockerstatsreceiver/factory_test.go index 9401b38ed1b7..d28a78fb8844 100644 --- a/receiver/dockerstatsreceiver/factory_test.go +++ b/receiver/dockerstatsreceiver/factory_test.go @@ -35,4 +35,8 @@ func TestCreateReceiver(t *testing.T) { metricReceiver, err := factory.CreateMetrics(context.Background(), params, config, consumertest.NewNop()) assert.NoError(t, err, "Metric receiver creation failed") assert.NotNil(t, metricReceiver, "receiver creation failed") + + logReceiver, err := factory.CreateLogs(context.Background(), params, config, consumertest.NewNop()) + assert.NoError(t, err, "log receiver creation failed") + assert.NotNil(t, logReceiver, "receiver creation failed") } diff --git a/receiver/dockerstatsreceiver/generated_component_test.go b/receiver/dockerstatsreceiver/generated_component_test.go index 4ca9d558b309..c75deb4dc597 100644 --- a/receiver/dockerstatsreceiver/generated_component_test.go +++ b/receiver/dockerstatsreceiver/generated_component_test.go @@ -32,6 +32,13 @@ func TestComponentLifecycle(t *testing.T) { createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) }{ + { + name: "logs", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateLogs(ctx, set, cfg, consumertest.NewNop()) + }, + }, + { name: "metrics", createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { diff --git a/receiver/dockerstatsreceiver/go.mod b/receiver/dockerstatsreceiver/go.mod index 9f160fb0cf1f..f3b892b51f35 100644 --- a/receiver/dockerstatsreceiver/go.mod +++ b/receiver/dockerstatsreceiver/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/docker go 1.22.0 require ( + github.com/cenkalti/backoff/v4 v4.3.0 github.com/docker/docker v27.4.0+incompatible github.com/google/go-cmp v0.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.116.0 @@ -33,7 +34,6 @@ require ( github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/platforms v0.2.1 // indirect diff --git a/receiver/dockerstatsreceiver/internal/metadata/generated_status.go b/receiver/dockerstatsreceiver/internal/metadata/generated_status.go index c4f772f51ae4..8d7438b8bc2c 100644 --- a/receiver/dockerstatsreceiver/internal/metadata/generated_status.go +++ b/receiver/dockerstatsreceiver/internal/metadata/generated_status.go @@ -12,5 +12,6 @@ var ( ) const ( + LogsStability = component.StabilityLevelDevelopment MetricsStability = component.StabilityLevelAlpha ) diff --git a/receiver/dockerstatsreceiver/logs_receiver.go b/receiver/dockerstatsreceiver/logs_receiver.go new file mode 100644 index 000000000000..2e450d7506a0 --- /dev/null +++ b/receiver/dockerstatsreceiver/logs_receiver.go @@ -0,0 +1,201 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dockerstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver" + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker" +) + +const dockerEventPrefix = "docker" + +type logsReceiver struct { + config *Config + settings receiver.Settings + + cancel context.CancelFunc + consumer consumer.Logs + eventPoller *dockerEventPoller +} + +func newLogsReceiver(set receiver.Settings, config *Config, consumer consumer.Logs) *logsReceiver { + return &logsReceiver{ + config: config, + settings: set, + consumer: consumer, + } +} + +func (r *logsReceiver) Start(ctx context.Context, _ component.Host) error { + var err error + client, err := docker.NewDockerClient(&r.config.Config, r.settings.Logger) + if err != nil { + return err + } + + if err = client.LoadContainerList(ctx); err != nil { + return err + } + + cctx, cancel := context.WithCancel(ctx) + r.cancel = cancel + r.eventPoller = newDockerEventPoller(r.config, client, r.settings.Logger, r.consumeDockerEvent) + go r.eventPoller.Start(cctx) + return nil +} + +func getDockerBackoffConfig(config *Config) *backoff.ExponentialBackOff { + b := &backoff.ExponentialBackOff{ + InitialInterval: config.MinDockerRetryWait, + MaxInterval: config.MaxDockerRetryWait, + MaxElapsedTime: 0, + Multiplier: 1.5, + RandomizationFactor: 0.5, + Clock: backoff.SystemClock, + } + b.Reset() + return b +} + +// dockerEventPoller manages the lifecycle and event stream of the Docker daemon connection. +type dockerEventPoller struct { + config *Config + client *docker.Client + logger *zap.Logger + eventHandler func(context.Context, *events.Message) error + backoff *backoff.ExponentialBackOff +} + +func newDockerEventPoller( + config *Config, + client *docker.Client, + logger *zap.Logger, + handler func(context.Context, *events.Message) error) *dockerEventPoller { + return &dockerEventPoller{ + config: config, + client: client, + logger: logger, + eventHandler: handler, + backoff: getDockerBackoffConfig(config), + } +} + +func (d *dockerEventPoller) Start(ctx context.Context) { + for { + filterArgs := filters.NewArgs() + for k, v := range d.config.Logs.Filters { + for _, elem := range v { + filterArgs.Add(k, elem) + } + } + // event stream can be interrupted by async errors (connection or other). + // client caller must retry to restart processing. retry with backoff here + // except for context cancellation. + eventChan, errChan := d.client.Events(ctx, events.ListOptions{ + Since: d.config.Logs.Since, + Until: d.config.Logs.Until, + Filters: filterArgs, + }) + + err := d.processEvents(ctx, eventChan, errChan) + if errors.Is(err, context.Canceled) { + return + } + // io.EOF is expected, no need to log but still needs retrying + if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { + d.logger.Error("Async error while processing docker events, reconnecting to docker daemon", zap.Error(err)) + } + nextBackoff := d.backoff.NextBackOff() + select { + case <-ctx.Done(): + return + case <-time.After(nextBackoff): + continue + } + } +} + +func (d *dockerEventPoller) processEvents(ctx context.Context, eventChan <-chan events.Message, errChan <-chan error) error { + processedOnce := false + for { + select { + case <-ctx.Done(): + return ctx.Err() + case event := <-eventChan: + // for the given method invocation, processing event indicates a successful daemon connection. + // backoff should be reset, no need to do this afterwards since the connection is already established. + if !processedOnce { + d.backoff.Reset() + processedOnce = true + } + if err := d.eventHandler(ctx, &event); err != nil { + d.logger.Error("Failed to process docker event", zap.Error(err)) + } + case err := <-errChan: + return err + } + } +} + +// TODO: add batching based on time/volume. for now, one event -> one logs +func (r *logsReceiver) consumeDockerEvent(ctx context.Context, event *events.Message) error { + if event.Type == "" { + return nil + } + logs := plog.NewLogs() + logRecord := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(event.Time, event.TimeNano))) + attrs := logRecord.Attributes() + action := string(event.Action) + // for cases with health_status: running etc., event.name should be + // properly namespaced as docker.container.health_status.running + if strings.Contains(action, ": ") { + action = strings.Join(strings.Split(action, ": "), ".") + } + if action != "" { + // i.e. docker.container.start + attrs.PutStr("event.name", fmt.Sprintf("%s.%s.%s", dockerEventPrefix, event.Type, action)) + } else { + attrs.PutStr("event.name", fmt.Sprintf("%s.%s", dockerEventPrefix, event.Type)) + } + if event.Scope != "" { + attrs.PutStr("event.scope", event.Scope) + } + if event.Actor.ID != "" { + attrs.PutStr("event.id", event.Actor.ID) + } + // body exactly replicates actor attributes + if len(event.Actor.Attributes) > 0 { + actorAttrs := logRecord.Body().SetEmptyMap() + for k, v := range event.Actor.Attributes { + if k != "" { + actorAttrs.PutStr(k, v) + } + } + } + return r.consumer.ConsumeLogs(ctx, logs) +} + +func (r *logsReceiver) Shutdown(_ context.Context) error { + if r.cancel != nil { + r.cancel() + } + return nil +} diff --git a/receiver/dockerstatsreceiver/logs_receiver_test.go b/receiver/dockerstatsreceiver/logs_receiver_test.go new file mode 100644 index 000000000000..2c26e354cc83 --- /dev/null +++ b/receiver/dockerstatsreceiver/logs_receiver_test.go @@ -0,0 +1,234 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dockerstatsreceiver + +import ( + "bytes" + "context" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker" +) + +var mockFolder = filepath.Join("testdata", "mock") + +func TestNewLogsReceiver(t *testing.T) { + cfg := &Config{ + Config: docker.Config{ + Endpoint: "unix:///run/some.sock", + DockerAPIVersion: defaultDockerAPIVersion, + }, + } + lr := newLogsReceiver(receivertest.NewNopSettings(), cfg, &consumertest.LogsSink{}) + assert.NotNil(t, lr) +} + +func TestErrorsInStartLogs(t *testing.T) { + unreachable := "unix:///not/a/thing.sock" + cfg := &Config{ + Config: docker.Config{ + Endpoint: unreachable, + DockerAPIVersion: defaultDockerAPIVersion, + }, + } + recv := newLogsReceiver(receivertest.NewNopSettings(), cfg, &consumertest.LogsSink{}) + assert.NotNil(t, recv) + + cfg.Endpoint = "..not/a/valid/endpoint" + err := recv.Start(context.Background(), componenttest.NewNopHost()) + assert.ErrorContains(t, err, "unable to parse docker host") + + cfg.Endpoint = unreachable + err = recv.Start(context.Background(), componenttest.NewNopHost()) + assert.ErrorContains(t, err, "context deadline exceeded") +} + +func createEventsMockServer(t *testing.T, eventsFilePaths []string) (*httptest.Server, error) { + t.Helper() + eventsPayloads := make([][]byte, len(eventsFilePaths)) + for i, eventPath := range eventsFilePaths { + events, err := os.ReadFile(filepath.Clean(eventPath)) + if err != nil { + return nil, err + } + eventsPayloads[i] = events + } + containerID := "73364842ef014441cac89fed05df19463b1230db25a31252cdf82e754f1ec581" + containerInfo := map[string]string{ + "/v1.25/containers/json": filepath.Join(mockFolder, "single_container_with_optional_resource_attributes", "containers.json"), + "/v1.25/containers/" + containerID + "/json": filepath.Join(mockFolder, "single_container_with_optional_resource_attributes", "container.json"), + "/v1.25/containers/" + containerID + "/stats": filepath.Join(mockFolder, "single_container_with_optional_resource_attributes", "stats.json"), + } + urlToFileContents := make(map[string][]byte, len(containerInfo)) + for urlPath, filePath := range containerInfo { + err := func() error { + fileContents, err := os.ReadFile(filepath.Clean(filePath)) + if err != nil { + return err + } + urlToFileContents[urlPath] = fileContents + return nil + }() + if err != nil { + return nil, err + } + } + + eventCallCount := 0 + failureCount := 0 + r := rand.New(rand.NewSource(time.Now().UnixNano())) + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("Content-Type", "application/json") + switch req.URL.Path { + case "/v1.25/events": + if eventCallCount >= len(eventsPayloads) { + return + } + // the receiver should be resilient and reconnect on http failures. + // randomly, return 500 up to 3 times and assert the receiver retry behavior. + if failureCount < 3 && r.Float32() < 0.4 { + rw.WriteHeader(http.StatusInternalServerError) + failureCount++ + return + } + for _, event := range bytes.Split(eventsPayloads[eventCallCount], []byte{'\n'}) { + if len(bytes.TrimSpace(event)) == 0 { + continue + } + _, err := rw.Write(append(event, '\n')) + assert.NoError(t, err) + rw.(http.Flusher).Flush() + } + eventCallCount++ + // test out disconnection/reconnection capability + conn, _, err := rw.(http.Hijacker).Hijack() + assert.NoError(t, err) + err = conn.Close() + assert.NoError(t, err) + default: + data, ok := urlToFileContents[req.URL.Path] + if !ok { + rw.WriteHeader(http.StatusNotFound) + return + } + _, err := rw.Write(data) + assert.NoError(t, err) + } + })), nil +} + +type dockerLogEventTestCase struct { + name string + expectedBody map[string]any + expectedAttrs map[string]any + timestamp time.Time +} + +func TestDockerEventPolling(t *testing.T) { + // events across connections should be aggregated + testCases := []dockerLogEventTestCase{ + { + name: "container health status event", + expectedBody: map[string]any{ + "image": "alpine:latest", + "name": "test-container", + }, + expectedAttrs: map[string]any{ + "event.name": "docker.container.health_status.running", + "event.scope": "local", + "event.id": "f97ed5bca0a5a0b85bfd52c4144b96174e825c92a138bc0458f0e196f2c7c1b4", + }, + timestamp: time.Unix(1699483576, 1699483576081311000), + }, + { + name: "network create event", + expectedBody: map[string]any{ + "name": "test-network", + "type": "bridge", + "driver": "bridge", + }, + expectedAttrs: map[string]any{ + "event.name": "docker.network.create", + "event.scope": "swarm", + "event.id": "8c0b5d75f8fb4c06b31f25e9d3c2702827d7a43c82a26c1538ddd5f7b3307d05", + }, + timestamp: time.Unix(1699483577, 1699483577123456000), + }, + { + name: "volume destroy event with no attributes", + expectedAttrs: map[string]any{ + "event.name": "docker.volume.destroy", + "event.id": "def456789", + }, + timestamp: time.Unix(1699483578, 1699483578123456000), + }, + { + name: "daemon start event with empty id", + expectedBody: map[string]any{ + "name": "docker-daemon", + }, + expectedAttrs: map[string]any{ + "event.name": "docker.daemon.start", + "event.scope": "local", + }, + timestamp: time.Unix(1699483579, 1699483579123456000), + }, + } + + mockDockerEngine, err := createEventsMockServer(t, []string{filepath.Join(mockFolder, "single_container", "events.json"), + filepath.Join(mockFolder, "single_container", "events2.json")}) + require.NoError(t, err) + defer mockDockerEngine.Close() + mockLogsConsumer := &consumertest.LogsSink{} + rcv := newLogsReceiver( + receiver.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }, &Config{ + Config: docker.Config{ + Endpoint: mockDockerEngine.URL, + DockerAPIVersion: defaultDockerAPIVersion, + Timeout: time.Second, + }, + }, mockLogsConsumer) + err = rcv.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { require.NoError(t, rcv.Shutdown(context.Background())) }() + + require.Eventually(t, func() bool { + return len(mockLogsConsumer.AllLogs()) == len(testCases) + }, 5*time.Second, 100*time.Millisecond) + logs := mockLogsConsumer.AllLogs() + require.Equal(t, len(logs), len(testCases)) + + for i, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + curLogRecord := logs[i].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + if tc.expectedBody == nil { + assert.Nil(t, curLogRecord.Body().AsRaw()) + } else { + assert.Equal(t, tc.expectedBody, curLogRecord.Body().AsRaw()) + } + assert.Equal(t, tc.expectedAttrs, curLogRecord.Attributes().AsRaw()) + assert.Equal(t, pcommon.NewTimestampFromTime(tc.timestamp), curLogRecord.Timestamp()) + }) + } +} diff --git a/receiver/dockerstatsreceiver/metadata.yaml b/receiver/dockerstatsreceiver/metadata.yaml index 06db7f4591b3..3fc366e8fa4e 100644 --- a/receiver/dockerstatsreceiver/metadata.yaml +++ b/receiver/dockerstatsreceiver/metadata.yaml @@ -3,6 +3,7 @@ type: docker_stats status: class: receiver stability: + development: [logs] alpha: [metrics] distributions: [contrib] codeowners: diff --git a/receiver/dockerstatsreceiver/receiver.go b/receiver/dockerstatsreceiver/metrics_receiver.go similarity index 100% rename from receiver/dockerstatsreceiver/receiver.go rename to receiver/dockerstatsreceiver/metrics_receiver.go diff --git a/receiver/dockerstatsreceiver/receiver_test.go b/receiver/dockerstatsreceiver/metrics_receiver_test.go similarity index 99% rename from receiver/dockerstatsreceiver/receiver_test.go rename to receiver/dockerstatsreceiver/metrics_receiver_test.go index c610668aace9..c9901e8595ab 100644 --- a/receiver/dockerstatsreceiver/receiver_test.go +++ b/receiver/dockerstatsreceiver/metrics_receiver_test.go @@ -30,8 +30,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver/internal/metadata" ) -var mockFolder = filepath.Join("testdata", "mock") - var ( metricEnabled = metadata.MetricConfig{Enabled: true} allMetricsEnabled = metadata.MetricsConfig{ @@ -120,7 +118,7 @@ var ( } ) -func TestNewReceiver(t *testing.T) { +func TestNewMetricsReceiver(t *testing.T) { cfg := &Config{ ControllerConfig: scraperhelper.ControllerConfig{ CollectionInterval: 1 * time.Second, @@ -134,7 +132,7 @@ func TestNewReceiver(t *testing.T) { assert.NotNil(t, mr) } -func TestErrorsInStart(t *testing.T) { +func TestErrorsInStartMetrics(t *testing.T) { unreachable := "unix:///not/a/thing.sock" cfg := &Config{ ControllerConfig: scraperhelper.ControllerConfig{ diff --git a/receiver/dockerstatsreceiver/testdata/config.yaml b/receiver/dockerstatsreceiver/testdata/config.yaml index de3ffce662a7..566962a8bffe 100644 --- a/receiver/dockerstatsreceiver/testdata/config.yaml +++ b/receiver/dockerstatsreceiver/testdata/config.yaml @@ -18,3 +18,11 @@ docker_stats/allsettings: enabled: false container.memory.total_rss: enabled: true + min_docker_retry_wait: 1s + max_docker_retry_wait: 30s + logs: + filters: + type: [ "container", "image" ] + event: [ "start", "stop", "die" ] + since: "2024-01-01T00:00:00Z" + until: "2024-01-02T00:00:00Z" \ No newline at end of file diff --git a/receiver/dockerstatsreceiver/testdata/mock/single_container/events.json b/receiver/dockerstatsreceiver/testdata/mock/single_container/events.json new file mode 100644 index 000000000000..6a45c2d56a45 --- /dev/null +++ b/receiver/dockerstatsreceiver/testdata/mock/single_container/events.json @@ -0,0 +1,2 @@ +{"status":"start","id":"f97ed5bca0a5a0b85bfd52c4144b96174e825c92a138bc0458f0e196f2c7c1b4","from":"alpine:latest","Type":"container","Action":"health_status: running","Actor":{"ID":"f97ed5bca0a5a0b85bfd52c4144b96174e825c92a138bc0458f0e196f2c7c1b4","Attributes":{"image":"alpine:latest","name":"test-container"}},"time":1699483576,"timeNano":1699483576081311000,"scope":"local"} +{"status":"create","id":"8c0b5d75f8fb4c06b31f25e9d3c2702827d7a43c82a26c1538ddd5f7b3307d05","from":"","Type":"network","Action":"create","Actor":{"ID":"8c0b5d75f8fb4c06b31f25e9d3c2702827d7a43c82a26c1538ddd5f7b3307d05","Attributes":{"name":"test-network","type":"bridge","driver":"bridge"}},"time":1699483577,"timeNano":1699483577123456000,"scope":"swarm"} \ No newline at end of file diff --git a/receiver/dockerstatsreceiver/testdata/mock/single_container/events2.json b/receiver/dockerstatsreceiver/testdata/mock/single_container/events2.json new file mode 100644 index 000000000000..fdb5a1d37ebe --- /dev/null +++ b/receiver/dockerstatsreceiver/testdata/mock/single_container/events2.json @@ -0,0 +1,5 @@ +{"status":"destroy","id":"def456789","from":"","Type":"volume","Action":"destroy","Actor":{"ID":"def456789"},"time":1699483578,"timeNano":1699483578123456000} +{"status":"start","id":"","from":"","Type":"daemon","Action":"start","Actor":{"ID":"","Attributes":{"name":"docker-daemon"}},"time":1699483579,"timeNano":1699483579123456000,"scope":"local"} +not json (shouldn't error, just cause retries) +test retry +again \ No newline at end of file From 1e9d043971509f7889a302878e3b1e929242984b Mon Sep 17 00:00:00 2001 From: Ryan Min Date: Fri, 13 Dec 2024 15:44:31 -0500 Subject: [PATCH 2/5] PR comments --- receiver/dockerstatsreceiver/README.md | 2 +- receiver/dockerstatsreceiver/logs_receiver.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/receiver/dockerstatsreceiver/README.md b/receiver/dockerstatsreceiver/README.md index 6d7241fc4a7e..5437a9a9702f 100644 --- a/receiver/dockerstatsreceiver/README.md +++ b/receiver/dockerstatsreceiver/README.md @@ -27,7 +27,7 @@ resource usage of cpu, memory, network, and the ## Configuration -The following settings are for both logs and metrics receier: +The following settings are for both logs and metrics receiver: - `endpoint` (default = `unix:///var/run/docker.sock`): Address to reach the desired Docker daemon. - `excluded_images` (no default, all running containers monitored): A list of strings, diff --git a/receiver/dockerstatsreceiver/logs_receiver.go b/receiver/dockerstatsreceiver/logs_receiver.go index 2e450d7506a0..ad1b451ed0a1 100644 --- a/receiver/dockerstatsreceiver/logs_receiver.go +++ b/receiver/dockerstatsreceiver/logs_receiver.go @@ -98,13 +98,13 @@ func newDockerEventPoller( } func (d *dockerEventPoller) Start(ctx context.Context) { - for { - filterArgs := filters.NewArgs() - for k, v := range d.config.Logs.Filters { - for _, elem := range v { - filterArgs.Add(k, elem) - } + filterArgs := filters.NewArgs() + for k, v := range d.config.Logs.Filters { + for _, elem := range v { + filterArgs.Add(k, elem) } + } + for { // event stream can be interrupted by async errors (connection or other). // client caller must retry to restart processing. retry with backoff here // except for context cancellation. From 660b0b57061256142006f70437fb3659b3a60155 Mon Sep 17 00:00:00 2001 From: Ryan Min Date: Wed, 18 Dec 2024 13:17:19 -0500 Subject: [PATCH 3/5] add integration tests --- receiver/dockerstatsreceiver/README.md | 1 + .../dockerstatsreceiver/integration_test.go | 188 ++++++++++++++++++ receiver/dockerstatsreceiver/logs_receiver.go | 10 +- .../dockerstatsreceiver/logs_receiver_test.go | 6 +- 4 files changed, 202 insertions(+), 3 deletions(-) diff --git a/receiver/dockerstatsreceiver/README.md b/receiver/dockerstatsreceiver/README.md index 5437a9a9702f..a07973fc98c7 100644 --- a/receiver/dockerstatsreceiver/README.md +++ b/receiver/dockerstatsreceiver/README.md @@ -40,6 +40,7 @@ The following settings are for both logs and metrics receiver: `!my*container` will exclude all containers whose image name doesn't match the blob `my*container`. - `timeout` (default = `5s`): The request timeout for any docker daemon query. - `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/). +Note: If different settings are needed for logs/metrics receivers, a new receiver config should be created entirely. Only for metrics receiver: - `collection_interval` (default = `10s`): The interval at which to gather container stats. diff --git a/receiver/dockerstatsreceiver/integration_test.go b/receiver/dockerstatsreceiver/integration_test.go index 571a2cb6e07e..789f1f242851 100644 --- a/receiver/dockerstatsreceiver/integration_test.go +++ b/receiver/dockerstatsreceiver/integration_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" rcvr "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" @@ -57,6 +58,23 @@ func createNginxContainer(ctx context.Context, t *testing.T) testcontainers.Cont return container } +func createRedisContainer(ctx context.Context, t *testing.T) testcontainers.Container { + req := testcontainers.ContainerRequest{ + Image: "docker.io/library/redis:latest", + ExposedPorts: []string{"6379/tcp"}, + WaitingFor: wait.ForExposedPort(), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err) + require.NotNil(t, container) + + return container +} + func hasResourceScopeMetrics(containerID string, metrics []pmetric.Metrics) bool { for _, m := range metrics { for i := 0; i < m.ResourceMetrics().Len(); i++ { @@ -155,6 +173,176 @@ func TestExcludedImageProducesNoMetricsIntegration(t *testing.T) { assert.NoError(t, recv.Shutdown(ctx)) } +func hasDockerEvents(logs []plog.Logs, containerID string, expectedEventNames []string) bool { + seen := make(map[string]bool) + for _, l := range logs { + for i := 0; i < l.ResourceLogs().Len(); i++ { + rl := l.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + for k := 0; k < sl.LogRecords().Len(); k++ { + record := sl.LogRecords().At(k) + attrs := record.Attributes() + if id, ok := attrs.Get("event.id"); ok && id.AsString() == containerID { + if name, ok := attrs.Get("event.name"); ok { + seen[name.AsString()] = true + } + } + } + } + } + } + + for _, expected := range expectedEventNames { + if !seen[expected] { + return false + } + } + return true +} + +func TestContainerLifecycleEventsIntegration(t *testing.T) { + t.Parallel() + params, ctx, cancel := paramsAndContext(t) + defer cancel() + + consumer := new(consumertest.LogsSink) + f, config := factory() + recv, err := f.CreateLogs(ctx, params, config, consumer) + + require.NoError(t, err, "failed creating logs receiver") + require.NoError(t, recv.Start(ctx, &nopHost{ + reportFunc: func(event *componentstatus.Event) { + require.NoError(t, event.Err()) + }, + })) + + // no events should be received before container starts + assert.Never(t, func() bool { + return len(consumer.AllLogs()) > 0 + }, 5*time.Second, 1*time.Second, "received unexpected events") + + nginxContainer := createNginxContainer(ctx, t) + nginxID := nginxContainer.GetContainerID() + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), nginxID, []string{ + "docker.container.create", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container create/start events") + + // Start second container (redis) and verify we get events from both + redisContainer := createRedisContainer(ctx, t) + redisID := redisContainer.GetContainerID() + + // Reset consumer to only check new events + consumer.Reset() + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + "docker.container.create", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive redis container events") + + consumer.Reset() + require.NoError(t, nginxContainer.Terminate(ctx)) + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + "docker.container.die", + "docker.container.stop", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container stop/die events") + + require.NoError(t, redisContainer.Terminate(ctx)) + assert.NoError(t, recv.Shutdown(ctx)) +} + +func TestFilteredContainerEventsIntegration(t *testing.T) { + t.Parallel() + params, ctx, cancel := paramsAndContext(t) + defer cancel() + + f, config := factory() + // Only receive events from redis containers + config.Logs.Filters = map[string][]string{ + "image": {"*redis*"}, + } + + consumer := new(consumertest.LogsSink) + recv, err := f.CreateLogs(ctx, params, config, consumer) + require.NoError(t, err, "failed creating logs receiver") + require.NoError(t, recv.Start(ctx, &nopHost{ + reportFunc: func(event *componentstatus.Event) { + require.NoError(t, event.Err()) + }, + })) + + nginxContainer := createNginxContainer(ctx, t) + assert.Never(t, func() bool { + return len(consumer.AllLogs()) > 0 + }, 5*time.Second, 1*time.Second, "received events for excluded container") + + redisContainer := createRedisContainer(ctx, t) + redisID := redisContainer.GetContainerID() + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + "docker.container.create", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive redis container events") + + require.NoError(t, nginxContainer.Terminate(ctx)) + require.NoError(t, redisContainer.Terminate(ctx)) + assert.NoError(t, recv.Shutdown(ctx)) +} + +func TestContainerRestartEventsIntegration(t *testing.T) { + t.Parallel() + params, ctx, cancel := paramsAndContext(t) + defer cancel() + + consumer := new(consumertest.LogsSink) + f, config := factory() + recv, err := f.CreateLogs(ctx, params, config, consumer) + + require.NoError(t, err, "failed creating logs receiver") + require.NoError(t, recv.Start(ctx, &nopHost{ + reportFunc: func(event *componentstatus.Event) { + require.NoError(t, event.Err()) + }, + })) + + nginxContainer := createNginxContainer(ctx, t) + nginxID := nginxContainer.GetContainerID() + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), nginxID, []string{ + "docker.container.create", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container start events") + + consumer.Reset() + stopDuration := 2 * time.Second + require.NoError(t, nginxContainer.Stop(ctx, &stopDuration)) + require.NoError(t, nginxContainer.Start(ctx)) + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), nginxID, []string{ + "docker.container.die", + "docker.container.stop", + "docker.container.start", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container restart events") + + require.NoError(t, nginxContainer.Terminate(ctx)) + assert.NoError(t, recv.Shutdown(ctx)) +} + var _ componentstatus.Reporter = (*nopHost)(nil) type nopHost struct { diff --git a/receiver/dockerstatsreceiver/logs_receiver.go b/receiver/dockerstatsreceiver/logs_receiver.go index ad1b451ed0a1..ded4ef8df108 100644 --- a/receiver/dockerstatsreceiver/logs_receiver.go +++ b/receiver/dockerstatsreceiver/logs_receiver.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "strings" + "sync" "time" "github.com/cenkalti/backoff/v4" @@ -81,13 +82,15 @@ type dockerEventPoller struct { logger *zap.Logger eventHandler func(context.Context, *events.Message) error backoff *backoff.ExponentialBackOff + sync.WaitGroup } func newDockerEventPoller( config *Config, client *docker.Client, logger *zap.Logger, - handler func(context.Context, *events.Message) error) *dockerEventPoller { + handler func(context.Context, *events.Message) error, +) *dockerEventPoller { return &dockerEventPoller{ config: config, client: client, @@ -133,6 +136,8 @@ func (d *dockerEventPoller) Start(ctx context.Context) { } func (d *dockerEventPoller) processEvents(ctx context.Context, eventChan <-chan events.Message, errChan <-chan error) error { + d.Add(1) + defer d.Done() processedOnce := false for { select { @@ -197,5 +202,8 @@ func (r *logsReceiver) Shutdown(_ context.Context) error { if r.cancel != nil { r.cancel() } + if r.eventPoller != nil { + r.eventPoller.Wait() + } return nil } diff --git a/receiver/dockerstatsreceiver/logs_receiver_test.go b/receiver/dockerstatsreceiver/logs_receiver_test.go index 2c26e354cc83..f23226dbf4b2 100644 --- a/receiver/dockerstatsreceiver/logs_receiver_test.go +++ b/receiver/dockerstatsreceiver/logs_receiver_test.go @@ -192,8 +192,10 @@ func TestDockerEventPolling(t *testing.T) { }, } - mockDockerEngine, err := createEventsMockServer(t, []string{filepath.Join(mockFolder, "single_container", "events.json"), - filepath.Join(mockFolder, "single_container", "events2.json")}) + mockDockerEngine, err := createEventsMockServer(t, []string{ + filepath.Join(mockFolder, "single_container", "events.json"), + filepath.Join(mockFolder, "single_container", "events2.json"), + }) require.NoError(t, err) defer mockDockerEngine.Close() mockLogsConsumer := &consumertest.LogsSink{} From da811e156be839fefc96298927b49ae96922b105 Mon Sep 17 00:00:00 2001 From: Ryan Min Date: Thu, 19 Dec 2024 14:53:26 -0500 Subject: [PATCH 4/5] add to codeowners --- .github/CODEOWNERS | 2 +- .../dockerstatsreceiver/integration_test.go | 20 +++++++++++++++++-- receiver/dockerstatsreceiver/logs_receiver.go | 11 ++++++---- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 5e17e88ddbe5..fd9ae2f4d91e 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -218,7 +218,7 @@ receiver/cloudfoundryreceiver/ @open-telemetry/collector-cont receiver/collectdreceiver/ @open-telemetry/collector-contrib-approvers @atoulme receiver/couchdbreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski receiver/datadogreceiver/ @open-telemetry/collector-contrib-approvers @boostchicken @gouthamve @MovieStoreGuy -receiver/dockerstatsreceiver/ @open-telemetry/collector-contrib-approvers @jamesmoessis +receiver/dockerstatsreceiver/ @open-telemetry/collector-contrib-approvers @jamesmoessis @spiffyy99 receiver/elasticsearchreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski receiver/expvarreceiver/ @open-telemetry/collector-contrib-approvers @jamesmoessis @MovieStoreGuy receiver/filelogreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski diff --git a/receiver/dockerstatsreceiver/integration_test.go b/receiver/dockerstatsreceiver/integration_test.go index 789f1f242851..918c4aaa7acf 100644 --- a/receiver/dockerstatsreceiver/integration_test.go +++ b/receiver/dockerstatsreceiver/integration_test.go @@ -250,13 +250,21 @@ func TestContainerLifecycleEventsIntegration(t *testing.T) { require.NoError(t, nginxContainer.Terminate(ctx)) assert.Eventuallyf(t, func() bool { - return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + return hasDockerEvents(consumer.AllLogs(), nginxID, []string{ "docker.container.die", "docker.container.stop", }) }, 5*time.Second, 1*time.Second, "failed to receive container stop/die events") + consumer.Reset() require.NoError(t, redisContainer.Terminate(ctx)) + + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + "docker.container.die", + "docker.container.stop", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container stop/die events") assert.NoError(t, recv.Shutdown(ctx)) } @@ -268,7 +276,7 @@ func TestFilteredContainerEventsIntegration(t *testing.T) { f, config := factory() // Only receive events from redis containers config.Logs.Filters = map[string][]string{ - "image": {"*redis*"}, + "image": {"redis"}, } consumer := new(consumertest.LogsSink) @@ -295,8 +303,16 @@ func TestFilteredContainerEventsIntegration(t *testing.T) { }) }, 5*time.Second, 1*time.Second, "failed to receive redis container events") + consumer.Reset() require.NoError(t, nginxContainer.Terminate(ctx)) require.NoError(t, redisContainer.Terminate(ctx)) + assert.Eventuallyf(t, func() bool { + return hasDockerEvents(consumer.AllLogs(), redisID, []string{ + "docker.container.die", + "docker.container.stop", + }) + }, 5*time.Second, 1*time.Second, "failed to receive container stop/die events") + assert.NoError(t, recv.Shutdown(ctx)) } diff --git a/receiver/dockerstatsreceiver/logs_receiver.go b/receiver/dockerstatsreceiver/logs_receiver.go index ded4ef8df108..0b0a61d1d3ec 100644 --- a/receiver/dockerstatsreceiver/logs_receiver.go +++ b/receiver/dockerstatsreceiver/logs_receiver.go @@ -101,10 +101,13 @@ func newDockerEventPoller( } func (d *dockerEventPoller) Start(ctx context.Context) { - filterArgs := filters.NewArgs() - for k, v := range d.config.Logs.Filters { - for _, elem := range v { - filterArgs.Add(k, elem) + filterArgs := filters.Args{} + if len(d.config.Logs.Filters) > 0 { + filterArgs = filters.NewArgs() + for k, v := range d.config.Logs.Filters { + for _, elem := range v { + filterArgs.Add(k, elem) + } } } for { From 076ef3e7f6612f6be6b8d1907b6dd7d86845f799 Mon Sep 17 00:00:00 2001 From: Ryan Min Date: Thu, 19 Dec 2024 15:03:24 -0500 Subject: [PATCH 5/5] add print statement --- receiver/dockerstatsreceiver/integration_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/receiver/dockerstatsreceiver/integration_test.go b/receiver/dockerstatsreceiver/integration_test.go index 918c4aaa7acf..3a4f5461539a 100644 --- a/receiver/dockerstatsreceiver/integration_test.go +++ b/receiver/dockerstatsreceiver/integration_test.go @@ -7,6 +7,7 @@ package dockerstatsreceiver import ( "context" + "fmt" "testing" "time" @@ -219,6 +220,7 @@ func TestContainerLifecycleEventsIntegration(t *testing.T) { // no events should be received before container starts assert.Never(t, func() bool { + fmt.Printf("%v", consumer.AllLogs()) return len(consumer.AllLogs()) > 0 }, 5*time.Second, 1*time.Second, "received unexpected events")