Skip to content

Commit

Permalink
[dockerstatsreceiver] add new log receiver to ingest docker events
Browse files Browse the repository at this point in the history
  • Loading branch information
spiffyy99 committed Nov 14, 2024
1 parent c06be6d commit 91c22b8
Show file tree
Hide file tree
Showing 17 changed files with 758 additions and 37 deletions.
27 changes: 27 additions & 0 deletions .chloggen/dockerstats_event_log_receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dockerstatsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new log receiver to ingest docker events.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29096]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
52 changes: 38 additions & 14 deletions receiver/dockerstatsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,63 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [alpha]: metrics |
| Stability | [development]: logs |
| | [alpha]: metrics |
| Unsupported Platforms | darwin, windows |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fdockerstats%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fdockerstats) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fdockerstats%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fdockerstats) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@jamesmoessis](https://www.github.com/jamesmoessis) |
| Emeritus | [@rmfitzpatrick](https://www.github.com/rmfitzpatrick) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

The Docker Stats receiver queries the local Docker daemon's container stats API for
all desired running containers on a configured interval. These stats are for container
The Docker Stats receiver queries the local Docker daemon:
- for the logs receiver, queries the events API for Docker events and streams them into log records. These events
can be any in the [docker events](https://docs.docker.com/reference/cli/docker/system/events/) command.
- for the metrics receiver, queries the container stats API for all desired running containers on a configured interval. These stats are for container
resource usage of cpu, memory, network, and the
[blkio controller](https://www.kernel.org/doc/Documentation/cgroup-v1/blkio-controller.txt).

> :information_source: Requires Docker API version 1.22+ and only Linux is supported.
## Configuration

The following settings are optional:
The following settings are for both logs and metrics receier:

- `endpoint` (default = `unix:///var/run/docker.sock`): Address to reach the desired Docker daemon.
- `excluded_images` (no default, all running containers monitored): A list of strings,
[regexes](https://golang.org/pkg/regexp/), or [globs](https://github.com/gobwas/glob) whose referent container image
names will not be among the queried containers. `!`-prefixed negations are possible for all item types to signify that
only unmatched container image names should be excluded.
- Regexes must be placed between `/` characters: `/my?egex/`. Negations are to be outside the forward slashes:
`!/my?egex/` will exclude all containers whose name doesn't match the compiled regex `my?egex`.
- Globs are non-regex items (e.g. `/items/`) containing any of the following: `*[]{}?`. Negations are supported:
`!my*container` will exclude all containers whose image name doesn't match the blob `my*container`.
- `timeout` (default = `5s`): The request timeout for any docker daemon query.
- `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/).

Only for metrics receiver:
- `collection_interval` (default = `10s`): The interval at which to gather container stats.
- `initial_delay` (default = `1s`): defines how long this receiver waits before starting.
- `container_labels_to_metric_labels` (no default): A map of Docker container label names whose label values to use
as the specified metric label key.
- `env_vars_to_metric_labels` (no default): A map of Docker container environment variables whose values to use
as the specified metric label key.
- `excluded_images` (no default, all running containers monitored): A list of strings,
[regexes](https://golang.org/pkg/regexp/), or [globs](https://github.com/gobwas/glob) whose referent container image
names will not be among the queried containers. `!`-prefixed negations are possible for all item types to signify that
only unmatched container image names should be excluded.
- Regexes must be placed between `/` characters: `/my?egex/`. Negations are to be outside the forward slashes:
`!/my?egex/` will exclude all containers whose name doesn't match the compiled regex `my?egex`.
- Globs are non-regex items (e.g. `/items/`) containing any of the following: `*[]{}?`. Negations are supported:
`!my*container` will exclude all containers whose image name doesn't match the blob `my*container`.
- `timeout` (default = `5s`): The request timeout for any docker daemon query.
- `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/).
- `metrics` (defaults at [./documentation.md](./documentation.md)): Enables/disables individual metrics. See [./documentation.md](./documentation.md) for full detail.

Only for logs receiver:
- `min_docker_retry_wait` (default = `1s`): The docker daemon may disconnect from the receiver. This configures the starting
duration to wait before attempting a reconnect (with exponential backoff).
- `max_docker_retry_wait` (default = `10s`): This configures the maximum
duration to wait before attempting a reconnect (with exponential backoff).
- `logs`: Configuration for which docker events to emit as logs. matches the [go docker api](https://github.com/moby/moby/blob/master/api/types/events/events.go#L131).
- `since`: The earliest time docker events should be emitted at. Accepts Unix timestamps or RFC3339 formatted timestamps (e.g., "2024-01-02T15:04:05Z").
- `until`: The latest time docker events should be emitted at. Same format as above.
- `filters`: a map of which docker events to emit based on properties. See [docker events](https://docs.docker.com/reference/cli/docker/system/events/) for more details.

Example:

```yaml
Expand All @@ -68,6 +84,14 @@ receivers:
enabled: true
container.network.io.usage.tx_dropped:
enabled: false
min_docker_retry_wait: 5s
max_docker_retry_wait: 30s
logs:
filters:
type: ["container", "image"]
event: ["start", "stop", "die"]
since: "2024-01-01T00:00:00Z"
until: "2024-01-02T00:00:00Z"
```
The full list of settings exposed for this receiver are documented [here](./config.go)
Expand Down
79 changes: 79 additions & 0 deletions receiver/dockerstatsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
package dockerstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver"

import (
"fmt"
"strconv"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/receiver/scraperhelper"
Expand All @@ -14,9 +18,21 @@ import (

var _ component.Config = (*Config)(nil)

// EventsConfig contains configuration for Docker events collection
type EventsConfig struct {
// Filters allows filtering which Docker events to collect
Filters map[string][]string `mapstructure:"filters"`
// Since shows events created since this timestamp
Since string `mapstructure:"since"`
// Until shows events created until this timestamp
Until string `mapstructure:"until"`
}

type Config struct {
docker.Config `mapstructure:",squash"`

// Metrics-specific settings

scraperhelper.ControllerConfig `mapstructure:",squash"`

// A mapping of container label names to MetricDescriptor label keys.
Expand All @@ -36,12 +52,75 @@ type Config struct {

// MetricsBuilderConfig config. Enable or disable stats by name.
metadata.MetricsBuilderConfig `mapstructure:",squash"`

// Logs-specific settings
// MinDockerRetryWait is the minimum time to wait before retrying to connect to the Docker daemon
MinDockerRetryWait time.Duration `mapstructure:"min_docker_retry_wait"`
// MaxDockerRetryWait is the maximum time to wait before retrying to connect to the Docker daemon
MaxDockerRetryWait time.Duration `mapstructure:"max_docker_retry_wait"`

// Logs configuration (Docker events)
Logs EventsConfig `mapstructure:"logs"`
}

func parseTimestamp(ts string) (time.Time, error) {
// Try Unix timestamp first
if i, err := strconv.ParseInt(ts, 10, 64); err == nil {
return time.Unix(i, 0), nil
}

// Try RFC3339
return time.Parse(time.RFC3339, ts)
}

func (config Config) Validate() error {
if err := docker.VersionIsValidAndGTE(config.DockerAPIVersion, minimumRequiredDockerAPIVersion); err != nil {
return err
}

// Validate logs-specific config
if config.MinDockerRetryWait <= 0 {
return fmt.Errorf("min_docker_retry_wait must be positive, got %v", config.MinDockerRetryWait)
}
if config.MaxDockerRetryWait <= 0 {
return fmt.Errorf("max_docker_retry_wait must be positive, got %v", config.MaxDockerRetryWait)
}
if config.MaxDockerRetryWait < config.MinDockerRetryWait {
return fmt.Errorf("max_docker_retry_wait must not be less than min_docker_retry_wait")
}

now := time.Now()
var sinceTime time.Time
if config.Logs.Since != "" {
var err error
sinceTime, err = parseTimestamp(config.Logs.Since)
if err != nil {
return fmt.Errorf("logs.since must be a Unix timestamp or RFC3339 time: %w", err)
}
if sinceTime.After(now) {
return fmt.Errorf("logs.since cannot be in the future")
}
}

// Parse and validate until if set
var untilTime time.Time
if config.Logs.Until != "" {
var err error
untilTime, err = parseTimestamp(config.Logs.Until)
if err != nil {
return fmt.Errorf("logs.until must be a Unix timestamp or RFC3339 time: %w", err)
}
if untilTime.After(now) {
config.Logs.Until = "" // Clear future until time
}
}

// If both are set, ensure since is not after until
if config.Logs.Since != "" && config.Logs.Until != "" {
if sinceTime.After(untilTime) {
return fmt.Errorf("logs.since must not be after logs.until")
}
}
return nil
}

Expand Down
152 changes: 134 additions & 18 deletions receiver/dockerstatsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ func TestLoadConfig(t *testing.T) {
}
return m
}(),
MinDockerRetryWait: 1 * time.Second,
MaxDockerRetryWait: 30 * time.Second,
Logs: EventsConfig{
Filters: map[string][]string{
"type": {"container", "image"},
"event": {"start", "stop", "die"},
},
Since: "2024-01-01T00:00:00Z",
Until: "2024-01-02T00:00:00Z",
},
},
},
}
Expand All @@ -98,28 +108,134 @@ func TestLoadConfig(t *testing.T) {
}

func TestValidateErrors(t *testing.T) {
cfg := &Config{ControllerConfig: scraperhelper.NewDefaultControllerConfig(), Config: docker.Config{
DockerAPIVersion: "1.25",
}}
assert.Equal(t, "endpoint must be specified", component.ValidateConfig(cfg).Error())

cfg = &Config{
Config: docker.Config{
DockerAPIVersion: "1.21",
Endpoint: "someEndpoint",
tests := []struct {
name string
cfg *Config
expectedErr string
}{
{
name: "missing endpoint",
cfg: &Config{
Config: docker.Config{
DockerAPIVersion: "1.25",
},
ControllerConfig: scraperhelper.NewDefaultControllerConfig(),
},
expectedErr: "endpoint must be specified",
},
{
name: "outdated api version",
cfg: &Config{
Config: docker.Config{
DockerAPIVersion: "1.21",
Endpoint: "someEndpoint",
},
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second},
},
expectedErr: `"api_version" 1.21 must be at least 1.25`,
},
{
name: "missing collection interval",
cfg: &Config{
Config: docker.Config{
Endpoint: "someEndpoint",
DockerAPIVersion: "1.25",
},
ControllerConfig: scraperhelper.ControllerConfig{},
},
expectedErr: `"collection_interval": requires positive value`,
},
{
name: "negative min retry wait",
cfg: &Config{
Config: docker.Config{
Endpoint: "unix:///var/run/docker.sock",
DockerAPIVersion: "1.25",
},
MinDockerRetryWait: -1 * time.Second,
MaxDockerRetryWait: 30 * time.Second,
},
expectedErr: "min_docker_retry_wait must be positive, got -1s",
},
{
name: "negative max retry wait",
cfg: &Config{
Config: docker.Config{
Endpoint: "unix:///var/run/docker.sock",
DockerAPIVersion: "1.25",
},
MinDockerRetryWait: 1 * time.Second,
MaxDockerRetryWait: -1 * time.Second,
},
expectedErr: "max_docker_retry_wait must be positive, got -1s",
},
{
name: "max less than min",
cfg: &Config{
Config: docker.Config{
Endpoint: "unix:///var/run/docker.sock",
DockerAPIVersion: "1.25",
},
MinDockerRetryWait: 30 * time.Second,
MaxDockerRetryWait: 1 * time.Second,
},
expectedErr: "max_docker_retry_wait must not be less than min_docker_retry_wait",
},
{
name: "invalid since timestamp",
cfg: &Config{
Config: docker.Config{
Endpoint: "unix:///var/run/docker.sock",
DockerAPIVersion: "1.25",
},
MinDockerRetryWait: 1 * time.Second,
MaxDockerRetryWait: 30 * time.Second,
Logs: EventsConfig{
Since: "not-a-timestamp",
},
},
expectedErr: "logs.since must be a Unix timestamp or RFC3339 time",
},
{
name: "future since timestamp",
cfg: &Config{
Config: docker.Config{
Endpoint: "unix:///var/run/docker.sock",
DockerAPIVersion: "1.25",
},
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second},
MinDockerRetryWait: 1 * time.Second,
MaxDockerRetryWait: 30 * time.Second,
Logs: EventsConfig{
Since: time.Now().Add(24 * time.Hour).Format(time.RFC3339),
},
},
expectedErr: "logs.since cannot be in the future",
},
{
name: "since after until",
cfg: &Config{
Config: docker.Config{
Endpoint: "unix:///var/run/docker.sock",
DockerAPIVersion: "1.25",
},
MinDockerRetryWait: 1 * time.Second,
MaxDockerRetryWait: 30 * time.Second,
Logs: EventsConfig{
Since: "2024-01-02T00:00:00Z",
Until: "2024-01-01T00:00:00Z",
},
},
expectedErr: "logs.since must not be after logs.until",
},
ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second},
}
assert.Equal(t, `"api_version" 1.21 must be at least 1.25`, component.ValidateConfig(cfg).Error())

cfg = &Config{
Config: docker.Config{
Endpoint: "someEndpoint",
DockerAPIVersion: "1.25",
},
ControllerConfig: scraperhelper.ControllerConfig{},
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := component.ValidateConfig(tt.cfg)
assert.ErrorContains(t, err, tt.expectedErr)
})
}
assert.Equal(t, `"collection_interval": requires positive value`, component.ValidateConfig(cfg).Error())
}

func TestApiVersionCustomError(t *testing.T) {
Expand Down
Loading

0 comments on commit 91c22b8

Please sign in to comment.