diff --git a/.chloggen/dockerstats_event_log_receiver.yaml b/.chloggen/dockerstats_event_log_receiver.yaml
new file mode 100644
index 000000000000..da64d8014832
--- /dev/null
+++ b/.chloggen/dockerstats_event_log_receiver.yaml
@@ -0,0 +1,27 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: enhancement
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: dockerstatsreceiver
+
+# A brief description of the change.  Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: Add new log receiver to ingest docker events.
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [29096]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext:
+
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: [user]
diff --git a/receiver/dockerstatsreceiver/README.md b/receiver/dockerstatsreceiver/README.md
index ccfabc263b84..6d7241fc4a7e 100644
--- a/receiver/dockerstatsreceiver/README.md
+++ b/receiver/dockerstatsreceiver/README.md
@@ -3,19 +3,23 @@
 <!-- status autogenerated section -->
 | Status        |           |
 | ------------- |-----------|
-| Stability     | [alpha]: metrics   |
+| Stability     | [development]: logs   |
+|               | [alpha]: metrics   |
 | Unsupported Platforms | darwin, windows |
 | Distributions | [contrib] |
 | Issues        | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fdockerstats%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fdockerstats) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fdockerstats%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fdockerstats) |
 | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner)    | [@jamesmoessis](https://www.github.com/jamesmoessis) |
 | Emeritus      | [@rmfitzpatrick](https://www.github.com/rmfitzpatrick) |
 
+[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
 [alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
 [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
 <!-- end autogenerated section -->
 
-The Docker Stats receiver queries the local Docker daemon's container stats API for
-all desired running containers on a configured interval.  These stats are for container
+The Docker Stats receiver queries the local Docker daemon:
+- for the logs receiver, queries the events API for Docker events and streams them into log records. These events
+can be any in the [docker events](https://docs.docker.com/reference/cli/docker/system/events/) command.
+- for the metrics receiver, queries the container stats API for all desired running containers on a configured interval. These stats are for container
 resource usage of cpu, memory, network, and the
 [blkio controller](https://www.kernel.org/doc/Documentation/cgroup-v1/blkio-controller.txt).
 
@@ -23,27 +27,39 @@ resource usage of cpu, memory, network, and the
 
 ## Configuration
 
-The following settings are optional:
+The following settings are for both logs and metrics receier:
 
 - `endpoint` (default = `unix:///var/run/docker.sock`): Address to reach the desired Docker daemon.
+- `excluded_images` (no default, all running containers monitored): A list of strings,
+    [regexes](https://golang.org/pkg/regexp/), or [globs](https://github.com/gobwas/glob) whose referent container image
+    names will not be among the queried containers. `!`-prefixed negations are possible for all item types to signify that
+    only unmatched container image names should be excluded.
+  - Regexes must be placed between `/` characters: `/my?egex/`.  Negations are to be outside the forward slashes:
+    `!/my?egex/` will exclude all containers whose name doesn't match the compiled regex `my?egex`.
+  - Globs are non-regex items (e.g. `/items/`) containing any of the following: `*[]{}?`.  Negations are supported:
+    `!my*container` will exclude all containers whose image name doesn't match the blob `my*container`.
+- `timeout` (default = `5s`): The request timeout for any docker daemon query.
+- `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/).
+
+Only for metrics receiver:
 - `collection_interval` (default = `10s`): The interval at which to gather container stats.
 - `initial_delay` (default = `1s`): defines how long this receiver waits before starting.
 - `container_labels_to_metric_labels` (no default): A map of Docker container label names whose label values to use
 as the specified metric label key.
 - `env_vars_to_metric_labels` (no default): A map of Docker container environment variables whose values to use
 as the specified metric label key.
-- `excluded_images` (no default, all running containers monitored): A list of strings,
-[regexes](https://golang.org/pkg/regexp/), or [globs](https://github.com/gobwas/glob) whose referent container image
-names will not be among the queried containers. `!`-prefixed negations are possible for all item types to signify that
-only unmatched container image names should be excluded.
-    - Regexes must be placed between `/` characters: `/my?egex/`.  Negations are to be outside the forward slashes:
-    `!/my?egex/` will exclude all containers whose name doesn't match the compiled regex `my?egex`.
-    - Globs are non-regex items (e.g. `/items/`) containing any of the following: `*[]{}?`.  Negations are supported:
-    `!my*container` will exclude all containers whose image name doesn't match the blob `my*container`.
-- `timeout` (default = `5s`): The request timeout for any docker daemon query.
-- `api_version` (default = `"1.25"`): The Docker client API version (must be 1.25+). Must be input as a string, not a float (e.g. `"1.40"` instead of `1.40`). [Docker API versions](https://docs.docker.com/engine/api/).
 - `metrics` (defaults at [./documentation.md](./documentation.md)): Enables/disables individual metrics. See [./documentation.md](./documentation.md) for full detail.
 
+Only for logs receiver:
+- `min_docker_retry_wait` (default = `1s`): The docker daemon may disconnect from the receiver. This configures the starting 
+duration to wait before attempting a reconnect (with exponential backoff).
+- `max_docker_retry_wait` (default = `10s`): This configures the maximum
+  duration to wait before attempting a reconnect (with exponential backoff).
+- `logs`: Configuration for which docker events to emit as logs. matches the [go docker api](https://github.com/moby/moby/blob/master/api/types/events/events.go#L131).
+  - `since`: The earliest time docker events should be emitted at. Accepts Unix timestamps or RFC3339 formatted timestamps (e.g., "2024-01-02T15:04:05Z").
+  - `until`: The latest time docker events should be emitted at. Same format as above.
+  - `filters`: a map of which docker events to emit based on properties. See [docker events](https://docs.docker.com/reference/cli/docker/system/events/) for more details.
+
 Example:
 
 ```yaml
@@ -68,6 +84,14 @@ receivers:
         enabled: true
       container.network.io.usage.tx_dropped:
         enabled: false
+    min_docker_retry_wait: 5s
+    max_docker_retry_wait: 30s
+    logs:
+      filters:
+        type: ["container", "image"]
+        event: ["start", "stop", "die"]
+      since: "2024-01-01T00:00:00Z"
+      until: "2024-01-02T00:00:00Z"
 ```
 
 The full list of settings exposed for this receiver are documented [here](./config.go)
diff --git a/receiver/dockerstatsreceiver/config.go b/receiver/dockerstatsreceiver/config.go
index 750560dbcf98..0ca3ded12610 100644
--- a/receiver/dockerstatsreceiver/config.go
+++ b/receiver/dockerstatsreceiver/config.go
@@ -4,6 +4,11 @@
 package dockerstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver"
 
 import (
+	"fmt"
+	"strconv"
+	"time"
+
+	"github.com/pkg/errors"
 	"go.opentelemetry.io/collector/component"
 	"go.opentelemetry.io/collector/confmap"
 	"go.opentelemetry.io/collector/receiver/scraperhelper"
@@ -14,9 +19,21 @@ import (
 
 var _ component.Config = (*Config)(nil)
 
+// EventsConfig contains configuration for Docker events collection
+type EventsConfig struct {
+	// Filters allows filtering which Docker events to collect
+	Filters map[string][]string `mapstructure:"filters"`
+	// Since shows events created since this timestamp
+	Since string `mapstructure:"since"`
+	// Until shows events created until this timestamp
+	Until string `mapstructure:"until"`
+}
+
 type Config struct {
 	docker.Config `mapstructure:",squash"`
 
+	// Metrics-specific settings
+
 	scraperhelper.ControllerConfig `mapstructure:",squash"`
 
 	// A mapping of container label names to MetricDescriptor label keys.
@@ -36,12 +53,75 @@ type Config struct {
 
 	// MetricsBuilderConfig config. Enable or disable stats by name.
 	metadata.MetricsBuilderConfig `mapstructure:",squash"`
+
+	// Logs-specific settings
+	// MinDockerRetryWait is the minimum time to wait before retrying to connect to the Docker daemon
+	MinDockerRetryWait time.Duration `mapstructure:"min_docker_retry_wait"`
+	// MaxDockerRetryWait is the maximum time to wait before retrying to connect to the Docker daemon
+	MaxDockerRetryWait time.Duration `mapstructure:"max_docker_retry_wait"`
+
+	// Logs configuration (Docker events)
+	Logs EventsConfig `mapstructure:"logs"`
+}
+
+func parseTimestamp(ts string) (time.Time, error) {
+	// Try Unix timestamp first
+	if i, err := strconv.ParseInt(ts, 10, 64); err == nil {
+		return time.Unix(i, 0), nil
+	}
+
+	// Try RFC3339
+	return time.Parse(time.RFC3339, ts)
 }
 
 func (config Config) Validate() error {
 	if err := docker.VersionIsValidAndGTE(config.DockerAPIVersion, minimumRequiredDockerAPIVersion); err != nil {
 		return err
 	}
+
+	// Validate logs-specific config
+	if config.MinDockerRetryWait <= 0 {
+		return fmt.Errorf("min_docker_retry_wait must be positive, got %v", config.MinDockerRetryWait)
+	}
+	if config.MaxDockerRetryWait <= 0 {
+		return fmt.Errorf("max_docker_retry_wait must be positive, got %v", config.MaxDockerRetryWait)
+	}
+	if config.MaxDockerRetryWait < config.MinDockerRetryWait {
+		return fmt.Errorf("max_docker_retry_wait must not be less than min_docker_retry_wait")
+	}
+
+	now := time.Now()
+	var sinceTime time.Time
+	if config.Logs.Since != "" {
+		var err error
+		sinceTime, err = parseTimestamp(config.Logs.Since)
+		if err != nil {
+			return errors.Wrapf(err, "logs.since must be a Unix timestamp or RFC3339 time")
+		}
+		if sinceTime.After(now) {
+			return fmt.Errorf("logs.since cannot be in the future")
+		}
+	}
+
+	// Parse and validate until if set
+	var untilTime time.Time
+	if config.Logs.Until != "" {
+		var err error
+		untilTime, err = parseTimestamp(config.Logs.Until)
+		if err != nil {
+			return errors.Wrapf(err, "logs.until must be a Unix timestamp or RFC3339 time")
+		}
+		if untilTime.After(now) {
+			config.Logs.Until = "" // Clear future until time
+		}
+	}
+
+	// If both are set, ensure since is not after until
+	if config.Logs.Since != "" && config.Logs.Until != "" {
+		if sinceTime.After(untilTime) {
+			return fmt.Errorf("logs.since must not be after logs.until")
+		}
+	}
 	return nil
 }
 
diff --git a/receiver/dockerstatsreceiver/config_test.go b/receiver/dockerstatsreceiver/config_test.go
index 1fd7602e06df..f278856508e4 100644
--- a/receiver/dockerstatsreceiver/config_test.go
+++ b/receiver/dockerstatsreceiver/config_test.go
@@ -78,6 +78,16 @@ func TestLoadConfig(t *testing.T) {
 					}
 					return m
 				}(),
+				MinDockerRetryWait: 1 * time.Second,
+				MaxDockerRetryWait: 30 * time.Second,
+				Logs: EventsConfig{
+					Filters: map[string][]string{
+						"type":  {"container", "image"},
+						"event": {"start", "stop", "die"},
+					},
+					Since: "2024-01-01T00:00:00Z",
+					Until: "2024-01-02T00:00:00Z",
+				},
 			},
 		},
 	}
@@ -98,28 +108,134 @@ func TestLoadConfig(t *testing.T) {
 }
 
 func TestValidateErrors(t *testing.T) {
-	cfg := &Config{ControllerConfig: scraperhelper.NewDefaultControllerConfig(), Config: docker.Config{
-		DockerAPIVersion: "1.25",
-	}}
-	assert.Equal(t, "endpoint must be specified", component.ValidateConfig(cfg).Error())
-
-	cfg = &Config{
-		Config: docker.Config{
-			DockerAPIVersion: "1.21",
-			Endpoint:         "someEndpoint",
+	tests := []struct {
+		name        string
+		cfg         *Config
+		expectedErr string
+	}{
+		{
+			name: "missing endpoint",
+			cfg: &Config{
+				Config: docker.Config{
+					DockerAPIVersion: "1.25",
+				},
+				ControllerConfig: scraperhelper.NewDefaultControllerConfig(),
+			},
+			expectedErr: "endpoint must be specified",
+		},
+		{
+			name: "outdated api version",
+			cfg: &Config{
+				Config: docker.Config{
+					DockerAPIVersion: "1.21",
+					Endpoint:         "someEndpoint",
+				},
+				ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second},
+			},
+			expectedErr: `"api_version" 1.21 must be at least 1.25`,
+		},
+		{
+			name: "missing collection interval",
+			cfg: &Config{
+				Config: docker.Config{
+					Endpoint:         "someEndpoint",
+					DockerAPIVersion: "1.25",
+				},
+				ControllerConfig: scraperhelper.ControllerConfig{},
+			},
+			expectedErr: `"collection_interval": requires positive value`,
+		},
+		{
+			name: "negative min retry wait",
+			cfg: &Config{
+				Config: docker.Config{
+					Endpoint:         "unix:///var/run/docker.sock",
+					DockerAPIVersion: "1.25",
+				},
+				MinDockerRetryWait: -1 * time.Second,
+				MaxDockerRetryWait: 30 * time.Second,
+			},
+			expectedErr: "min_docker_retry_wait must be positive, got -1s",
+		},
+		{
+			name: "negative max retry wait",
+			cfg: &Config{
+				Config: docker.Config{
+					Endpoint:         "unix:///var/run/docker.sock",
+					DockerAPIVersion: "1.25",
+				},
+				MinDockerRetryWait: 1 * time.Second,
+				MaxDockerRetryWait: -1 * time.Second,
+			},
+			expectedErr: "max_docker_retry_wait must be positive, got -1s",
+		},
+		{
+			name: "max less than min",
+			cfg: &Config{
+				Config: docker.Config{
+					Endpoint:         "unix:///var/run/docker.sock",
+					DockerAPIVersion: "1.25",
+				},
+				MinDockerRetryWait: 30 * time.Second,
+				MaxDockerRetryWait: 1 * time.Second,
+			},
+			expectedErr: "max_docker_retry_wait must not be less than min_docker_retry_wait",
+		},
+		{
+			name: "invalid since timestamp",
+			cfg: &Config{
+				Config: docker.Config{
+					Endpoint:         "unix:///var/run/docker.sock",
+					DockerAPIVersion: "1.25",
+				},
+				MinDockerRetryWait: 1 * time.Second,
+				MaxDockerRetryWait: 30 * time.Second,
+				Logs: EventsConfig{
+					Since: "not-a-timestamp",
+				},
+			},
+			expectedErr: "logs.since must be a Unix timestamp or RFC3339 time",
+		},
+		{
+			name: "future since timestamp",
+			cfg: &Config{
+				Config: docker.Config{
+					Endpoint:         "unix:///var/run/docker.sock",
+					DockerAPIVersion: "1.25",
+				},
+				ControllerConfig:   scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second},
+				MinDockerRetryWait: 1 * time.Second,
+				MaxDockerRetryWait: 30 * time.Second,
+				Logs: EventsConfig{
+					Since: time.Now().Add(24 * time.Hour).Format(time.RFC3339),
+				},
+			},
+			expectedErr: "logs.since cannot be in the future",
+		},
+		{
+			name: "since after until",
+			cfg: &Config{
+				Config: docker.Config{
+					Endpoint:         "unix:///var/run/docker.sock",
+					DockerAPIVersion: "1.25",
+				},
+				MinDockerRetryWait: 1 * time.Second,
+				MaxDockerRetryWait: 30 * time.Second,
+				Logs: EventsConfig{
+					Since: "2024-01-02T00:00:00Z",
+					Until: "2024-01-01T00:00:00Z",
+				},
+			},
+			expectedErr: "logs.since must not be after logs.until",
 		},
-		ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 1 * time.Second},
 	}
-	assert.Equal(t, `"api_version" 1.21 must be at least 1.25`, component.ValidateConfig(cfg).Error())
 
-	cfg = &Config{
-		Config: docker.Config{
-			Endpoint:         "someEndpoint",
-			DockerAPIVersion: "1.25",
-		},
-		ControllerConfig: scraperhelper.ControllerConfig{},
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			err := component.ValidateConfig(tt.cfg)
+			assert.ErrorContains(t, err, tt.expectedErr)
+		})
 	}
-	assert.Equal(t, `"collection_interval": requires positive value`, component.ValidateConfig(cfg).Error())
 }
 
 func TestApiVersionCustomError(t *testing.T) {
diff --git a/receiver/dockerstatsreceiver/factory.go b/receiver/dockerstatsreceiver/factory.go
index eeeca7159d39..762d5da33e7f 100644
--- a/receiver/dockerstatsreceiver/factory.go
+++ b/receiver/dockerstatsreceiver/factory.go
@@ -20,6 +20,7 @@ func NewFactory() receiver.Factory {
 	return receiver.NewFactory(
 		metadata.Type,
 		createDefaultConfig,
+		receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
 		receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability))
 }
 
@@ -35,9 +36,22 @@ func createDefaultConfig() component.Config {
 			Timeout:          scs.Timeout,
 		},
 		MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
+		MinDockerRetryWait:   time.Second,
+		MaxDockerRetryWait:   10 * time.Second,
+		Logs:                 EventsConfig{},
 	}
 }
 
+func createLogsReceiver(
+	_ context.Context,
+	params receiver.Settings,
+	config component.Config,
+	consumer consumer.Logs,
+) (receiver.Logs, error) {
+	dockerConfig := config.(*Config)
+	return newLogsReceiver(params, dockerConfig, consumer), nil
+}
+
 func createMetricsReceiver(
 	_ context.Context,
 	params receiver.Settings,
diff --git a/receiver/dockerstatsreceiver/factory_test.go b/receiver/dockerstatsreceiver/factory_test.go
index 9401b38ed1b7..d28a78fb8844 100644
--- a/receiver/dockerstatsreceiver/factory_test.go
+++ b/receiver/dockerstatsreceiver/factory_test.go
@@ -35,4 +35,8 @@ func TestCreateReceiver(t *testing.T) {
 	metricReceiver, err := factory.CreateMetrics(context.Background(), params, config, consumertest.NewNop())
 	assert.NoError(t, err, "Metric receiver creation failed")
 	assert.NotNil(t, metricReceiver, "receiver creation failed")
+
+	logReceiver, err := factory.CreateLogs(context.Background(), params, config, consumertest.NewNop())
+	assert.NoError(t, err, "log receiver creation failed")
+	assert.NotNil(t, logReceiver, "receiver creation failed")
 }
diff --git a/receiver/dockerstatsreceiver/generated_component_test.go b/receiver/dockerstatsreceiver/generated_component_test.go
index 4ca9d558b309..c75deb4dc597 100644
--- a/receiver/dockerstatsreceiver/generated_component_test.go
+++ b/receiver/dockerstatsreceiver/generated_component_test.go
@@ -32,6 +32,13 @@ func TestComponentLifecycle(t *testing.T) {
 		createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error)
 	}{
 
+		{
+			name: "logs",
+			createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) {
+				return factory.CreateLogs(ctx, set, cfg, consumertest.NewNop())
+			},
+		},
+
 		{
 			name: "metrics",
 			createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) {
diff --git a/receiver/dockerstatsreceiver/go.mod b/receiver/dockerstatsreceiver/go.mod
index dea1eb5d8c80..0452e0dbb58f 100644
--- a/receiver/dockerstatsreceiver/go.mod
+++ b/receiver/dockerstatsreceiver/go.mod
@@ -3,11 +3,13 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/docker
 go 1.22.0
 
 require (
+	github.com/cenkalti/backoff/v4 v4.3.0
 	github.com/docker/docker v27.3.1+incompatible
 	github.com/google/go-cmp v0.6.0
 	github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.113.0
 	github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.113.0
 	github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.113.0
+	github.com/pkg/errors v0.9.1
 	github.com/stretchr/testify v1.9.0
 	github.com/testcontainers/testcontainers-go v0.34.0
 	go.opentelemetry.io/collector/component v0.113.0
@@ -31,7 +33,6 @@ require (
 	github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect
 	github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
 	github.com/Microsoft/go-winio v0.6.2 // indirect
-	github.com/cenkalti/backoff/v4 v4.3.0 // indirect
 	github.com/cespare/xxhash/v2 v2.3.0 // indirect
 	github.com/containerd/log v0.1.0 // indirect
 	github.com/containerd/platforms v0.2.1 // indirect
@@ -69,7 +70,6 @@ require (
 	github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.113.0 // indirect
 	github.com/opencontainers/go-digest v1.0.0 // indirect
 	github.com/opencontainers/image-spec v1.1.0 // indirect
-	github.com/pkg/errors v0.9.1 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
 	github.com/rogpeppe/go-internal v1.12.0 // indirect
diff --git a/receiver/dockerstatsreceiver/internal/metadata/generated_status.go b/receiver/dockerstatsreceiver/internal/metadata/generated_status.go
index c4f772f51ae4..8d7438b8bc2c 100644
--- a/receiver/dockerstatsreceiver/internal/metadata/generated_status.go
+++ b/receiver/dockerstatsreceiver/internal/metadata/generated_status.go
@@ -12,5 +12,6 @@ var (
 )
 
 const (
+	LogsStability    = component.StabilityLevelDevelopment
 	MetricsStability = component.StabilityLevelAlpha
 )
diff --git a/receiver/dockerstatsreceiver/logs_receiver.go b/receiver/dockerstatsreceiver/logs_receiver.go
new file mode 100644
index 000000000000..2e450d7506a0
--- /dev/null
+++ b/receiver/dockerstatsreceiver/logs_receiver.go
@@ -0,0 +1,201 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package dockerstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver"
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"strings"
+	"time"
+
+	"github.com/cenkalti/backoff/v4"
+	"github.com/docker/docker/api/types/events"
+	"github.com/docker/docker/api/types/filters"
+	"go.opentelemetry.io/collector/component"
+	"go.opentelemetry.io/collector/consumer"
+	"go.opentelemetry.io/collector/pdata/pcommon"
+	"go.opentelemetry.io/collector/pdata/plog"
+	"go.opentelemetry.io/collector/receiver"
+	"go.uber.org/zap"
+
+	"github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker"
+)
+
+const dockerEventPrefix = "docker"
+
+type logsReceiver struct {
+	config   *Config
+	settings receiver.Settings
+
+	cancel      context.CancelFunc
+	consumer    consumer.Logs
+	eventPoller *dockerEventPoller
+}
+
+func newLogsReceiver(set receiver.Settings, config *Config, consumer consumer.Logs) *logsReceiver {
+	return &logsReceiver{
+		config:   config,
+		settings: set,
+		consumer: consumer,
+	}
+}
+
+func (r *logsReceiver) Start(ctx context.Context, _ component.Host) error {
+	var err error
+	client, err := docker.NewDockerClient(&r.config.Config, r.settings.Logger)
+	if err != nil {
+		return err
+	}
+
+	if err = client.LoadContainerList(ctx); err != nil {
+		return err
+	}
+
+	cctx, cancel := context.WithCancel(ctx)
+	r.cancel = cancel
+	r.eventPoller = newDockerEventPoller(r.config, client, r.settings.Logger, r.consumeDockerEvent)
+	go r.eventPoller.Start(cctx)
+	return nil
+}
+
+func getDockerBackoffConfig(config *Config) *backoff.ExponentialBackOff {
+	b := &backoff.ExponentialBackOff{
+		InitialInterval:     config.MinDockerRetryWait,
+		MaxInterval:         config.MaxDockerRetryWait,
+		MaxElapsedTime:      0,
+		Multiplier:          1.5,
+		RandomizationFactor: 0.5,
+		Clock:               backoff.SystemClock,
+	}
+	b.Reset()
+	return b
+}
+
+// dockerEventPoller manages the lifecycle and event stream of the Docker daemon connection.
+type dockerEventPoller struct {
+	config       *Config
+	client       *docker.Client
+	logger       *zap.Logger
+	eventHandler func(context.Context, *events.Message) error
+	backoff      *backoff.ExponentialBackOff
+}
+
+func newDockerEventPoller(
+	config *Config,
+	client *docker.Client,
+	logger *zap.Logger,
+	handler func(context.Context, *events.Message) error) *dockerEventPoller {
+	return &dockerEventPoller{
+		config:       config,
+		client:       client,
+		logger:       logger,
+		eventHandler: handler,
+		backoff:      getDockerBackoffConfig(config),
+	}
+}
+
+func (d *dockerEventPoller) Start(ctx context.Context) {
+	for {
+		filterArgs := filters.NewArgs()
+		for k, v := range d.config.Logs.Filters {
+			for _, elem := range v {
+				filterArgs.Add(k, elem)
+			}
+		}
+		// event stream can be interrupted by async errors (connection or other).
+		// client caller must retry to restart processing. retry with backoff here
+		// except for context cancellation.
+		eventChan, errChan := d.client.Events(ctx, events.ListOptions{
+			Since:   d.config.Logs.Since,
+			Until:   d.config.Logs.Until,
+			Filters: filterArgs,
+		})
+
+		err := d.processEvents(ctx, eventChan, errChan)
+		if errors.Is(err, context.Canceled) {
+			return
+		}
+		// io.EOF is expected, no need to log but still needs retrying
+		if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
+			d.logger.Error("Async error while processing docker events, reconnecting to docker daemon", zap.Error(err))
+		}
+		nextBackoff := d.backoff.NextBackOff()
+		select {
+		case <-ctx.Done():
+			return
+		case <-time.After(nextBackoff):
+			continue
+		}
+	}
+}
+
+func (d *dockerEventPoller) processEvents(ctx context.Context, eventChan <-chan events.Message, errChan <-chan error) error {
+	processedOnce := false
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case event := <-eventChan:
+			// for the given method invocation, processing event indicates a successful daemon connection.
+			// backoff should be reset, no need to do this afterwards since the connection is already established.
+			if !processedOnce {
+				d.backoff.Reset()
+				processedOnce = true
+			}
+			if err := d.eventHandler(ctx, &event); err != nil {
+				d.logger.Error("Failed to process docker event", zap.Error(err))
+			}
+		case err := <-errChan:
+			return err
+		}
+	}
+}
+
+// TODO: add batching based on time/volume. for now, one event -> one logs
+func (r *logsReceiver) consumeDockerEvent(ctx context.Context, event *events.Message) error {
+	if event.Type == "" {
+		return nil
+	}
+	logs := plog.NewLogs()
+	logRecord := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
+	logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(event.Time, event.TimeNano)))
+	attrs := logRecord.Attributes()
+	action := string(event.Action)
+	// for cases with health_status: running etc., event.name should be
+	// properly namespaced as docker.container.health_status.running
+	if strings.Contains(action, ": ") {
+		action = strings.Join(strings.Split(action, ": "), ".")
+	}
+	if action != "" {
+		// i.e. docker.container.start
+		attrs.PutStr("event.name", fmt.Sprintf("%s.%s.%s", dockerEventPrefix, event.Type, action))
+	} else {
+		attrs.PutStr("event.name", fmt.Sprintf("%s.%s", dockerEventPrefix, event.Type))
+	}
+	if event.Scope != "" {
+		attrs.PutStr("event.scope", event.Scope)
+	}
+	if event.Actor.ID != "" {
+		attrs.PutStr("event.id", event.Actor.ID)
+	}
+	// body exactly replicates actor attributes
+	if len(event.Actor.Attributes) > 0 {
+		actorAttrs := logRecord.Body().SetEmptyMap()
+		for k, v := range event.Actor.Attributes {
+			if k != "" {
+				actorAttrs.PutStr(k, v)
+			}
+		}
+	}
+	return r.consumer.ConsumeLogs(ctx, logs)
+}
+
+func (r *logsReceiver) Shutdown(_ context.Context) error {
+	if r.cancel != nil {
+		r.cancel()
+	}
+	return nil
+}
diff --git a/receiver/dockerstatsreceiver/logs_receiver_test.go b/receiver/dockerstatsreceiver/logs_receiver_test.go
new file mode 100644
index 000000000000..2c26e354cc83
--- /dev/null
+++ b/receiver/dockerstatsreceiver/logs_receiver_test.go
@@ -0,0 +1,234 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package dockerstatsreceiver
+
+import (
+	"bytes"
+	"context"
+	"math/rand"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"path/filepath"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"go.opentelemetry.io/collector/component"
+	"go.opentelemetry.io/collector/component/componenttest"
+	"go.opentelemetry.io/collector/consumer/consumertest"
+	"go.opentelemetry.io/collector/pdata/pcommon"
+	"go.opentelemetry.io/collector/receiver"
+	"go.opentelemetry.io/collector/receiver/receivertest"
+	"go.uber.org/zap/zaptest"
+
+	"github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker"
+)
+
+var mockFolder = filepath.Join("testdata", "mock")
+
+func TestNewLogsReceiver(t *testing.T) {
+	cfg := &Config{
+		Config: docker.Config{
+			Endpoint:         "unix:///run/some.sock",
+			DockerAPIVersion: defaultDockerAPIVersion,
+		},
+	}
+	lr := newLogsReceiver(receivertest.NewNopSettings(), cfg, &consumertest.LogsSink{})
+	assert.NotNil(t, lr)
+}
+
+func TestErrorsInStartLogs(t *testing.T) {
+	unreachable := "unix:///not/a/thing.sock"
+	cfg := &Config{
+		Config: docker.Config{
+			Endpoint:         unreachable,
+			DockerAPIVersion: defaultDockerAPIVersion,
+		},
+	}
+	recv := newLogsReceiver(receivertest.NewNopSettings(), cfg, &consumertest.LogsSink{})
+	assert.NotNil(t, recv)
+
+	cfg.Endpoint = "..not/a/valid/endpoint"
+	err := recv.Start(context.Background(), componenttest.NewNopHost())
+	assert.ErrorContains(t, err, "unable to parse docker host")
+
+	cfg.Endpoint = unreachable
+	err = recv.Start(context.Background(), componenttest.NewNopHost())
+	assert.ErrorContains(t, err, "context deadline exceeded")
+}
+
+func createEventsMockServer(t *testing.T, eventsFilePaths []string) (*httptest.Server, error) {
+	t.Helper()
+	eventsPayloads := make([][]byte, len(eventsFilePaths))
+	for i, eventPath := range eventsFilePaths {
+		events, err := os.ReadFile(filepath.Clean(eventPath))
+		if err != nil {
+			return nil, err
+		}
+		eventsPayloads[i] = events
+	}
+	containerID := "73364842ef014441cac89fed05df19463b1230db25a31252cdf82e754f1ec581"
+	containerInfo := map[string]string{
+		"/v1.25/containers/json":                      filepath.Join(mockFolder, "single_container_with_optional_resource_attributes", "containers.json"),
+		"/v1.25/containers/" + containerID + "/json":  filepath.Join(mockFolder, "single_container_with_optional_resource_attributes", "container.json"),
+		"/v1.25/containers/" + containerID + "/stats": filepath.Join(mockFolder, "single_container_with_optional_resource_attributes", "stats.json"),
+	}
+	urlToFileContents := make(map[string][]byte, len(containerInfo))
+	for urlPath, filePath := range containerInfo {
+		err := func() error {
+			fileContents, err := os.ReadFile(filepath.Clean(filePath))
+			if err != nil {
+				return err
+			}
+			urlToFileContents[urlPath] = fileContents
+			return nil
+		}()
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	eventCallCount := 0
+	failureCount := 0
+	r := rand.New(rand.NewSource(time.Now().UnixNano()))
+	return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
+		rw.Header().Set("Content-Type", "application/json")
+		switch req.URL.Path {
+		case "/v1.25/events":
+			if eventCallCount >= len(eventsPayloads) {
+				return
+			}
+			// the receiver should be resilient and reconnect on http failures.
+			// randomly, return 500 up to 3 times and assert the receiver retry behavior.
+			if failureCount < 3 && r.Float32() < 0.4 {
+				rw.WriteHeader(http.StatusInternalServerError)
+				failureCount++
+				return
+			}
+			for _, event := range bytes.Split(eventsPayloads[eventCallCount], []byte{'\n'}) {
+				if len(bytes.TrimSpace(event)) == 0 {
+					continue
+				}
+				_, err := rw.Write(append(event, '\n'))
+				assert.NoError(t, err)
+				rw.(http.Flusher).Flush()
+			}
+			eventCallCount++
+			// test out disconnection/reconnection capability
+			conn, _, err := rw.(http.Hijacker).Hijack()
+			assert.NoError(t, err)
+			err = conn.Close()
+			assert.NoError(t, err)
+		default:
+			data, ok := urlToFileContents[req.URL.Path]
+			if !ok {
+				rw.WriteHeader(http.StatusNotFound)
+				return
+			}
+			_, err := rw.Write(data)
+			assert.NoError(t, err)
+		}
+	})), nil
+}
+
+type dockerLogEventTestCase struct {
+	name          string
+	expectedBody  map[string]any
+	expectedAttrs map[string]any
+	timestamp     time.Time
+}
+
+func TestDockerEventPolling(t *testing.T) {
+	// events across connections should be aggregated
+	testCases := []dockerLogEventTestCase{
+		{
+			name: "container health status event",
+			expectedBody: map[string]any{
+				"image": "alpine:latest",
+				"name":  "test-container",
+			},
+			expectedAttrs: map[string]any{
+				"event.name":  "docker.container.health_status.running",
+				"event.scope": "local",
+				"event.id":    "f97ed5bca0a5a0b85bfd52c4144b96174e825c92a138bc0458f0e196f2c7c1b4",
+			},
+			timestamp: time.Unix(1699483576, 1699483576081311000),
+		},
+		{
+			name: "network create event",
+			expectedBody: map[string]any{
+				"name":   "test-network",
+				"type":   "bridge",
+				"driver": "bridge",
+			},
+			expectedAttrs: map[string]any{
+				"event.name":  "docker.network.create",
+				"event.scope": "swarm",
+				"event.id":    "8c0b5d75f8fb4c06b31f25e9d3c2702827d7a43c82a26c1538ddd5f7b3307d05",
+			},
+			timestamp: time.Unix(1699483577, 1699483577123456000),
+		},
+		{
+			name: "volume destroy event with no attributes",
+			expectedAttrs: map[string]any{
+				"event.name": "docker.volume.destroy",
+				"event.id":   "def456789",
+			},
+			timestamp: time.Unix(1699483578, 1699483578123456000),
+		},
+		{
+			name: "daemon start event with empty id",
+			expectedBody: map[string]any{
+				"name": "docker-daemon",
+			},
+			expectedAttrs: map[string]any{
+				"event.name":  "docker.daemon.start",
+				"event.scope": "local",
+			},
+			timestamp: time.Unix(1699483579, 1699483579123456000),
+		},
+	}
+
+	mockDockerEngine, err := createEventsMockServer(t, []string{filepath.Join(mockFolder, "single_container", "events.json"),
+		filepath.Join(mockFolder, "single_container", "events2.json")})
+	require.NoError(t, err)
+	defer mockDockerEngine.Close()
+	mockLogsConsumer := &consumertest.LogsSink{}
+	rcv := newLogsReceiver(
+		receiver.Settings{
+			TelemetrySettings: component.TelemetrySettings{
+				Logger: zaptest.NewLogger(t),
+			},
+		}, &Config{
+			Config: docker.Config{
+				Endpoint:         mockDockerEngine.URL,
+				DockerAPIVersion: defaultDockerAPIVersion,
+				Timeout:          time.Second,
+			},
+		}, mockLogsConsumer)
+	err = rcv.Start(context.Background(), componenttest.NewNopHost())
+	require.NoError(t, err)
+	defer func() { require.NoError(t, rcv.Shutdown(context.Background())) }()
+
+	require.Eventually(t, func() bool {
+		return len(mockLogsConsumer.AllLogs()) == len(testCases)
+	}, 5*time.Second, 100*time.Millisecond)
+	logs := mockLogsConsumer.AllLogs()
+	require.Equal(t, len(logs), len(testCases))
+
+	for i, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			curLogRecord := logs[i].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
+			if tc.expectedBody == nil {
+				assert.Nil(t, curLogRecord.Body().AsRaw())
+			} else {
+				assert.Equal(t, tc.expectedBody, curLogRecord.Body().AsRaw())
+			}
+			assert.Equal(t, tc.expectedAttrs, curLogRecord.Attributes().AsRaw())
+			assert.Equal(t, pcommon.NewTimestampFromTime(tc.timestamp), curLogRecord.Timestamp())
+		})
+	}
+}
diff --git a/receiver/dockerstatsreceiver/metadata.yaml b/receiver/dockerstatsreceiver/metadata.yaml
index 06db7f4591b3..3fc366e8fa4e 100644
--- a/receiver/dockerstatsreceiver/metadata.yaml
+++ b/receiver/dockerstatsreceiver/metadata.yaml
@@ -3,6 +3,7 @@ type: docker_stats
 status:
   class: receiver
   stability:
+    development: [logs]
     alpha: [metrics]
   distributions: [contrib]
   codeowners:
diff --git a/receiver/dockerstatsreceiver/receiver.go b/receiver/dockerstatsreceiver/metrics_receiver.go
similarity index 100%
rename from receiver/dockerstatsreceiver/receiver.go
rename to receiver/dockerstatsreceiver/metrics_receiver.go
diff --git a/receiver/dockerstatsreceiver/receiver_test.go b/receiver/dockerstatsreceiver/metrics_receiver_test.go
similarity index 99%
rename from receiver/dockerstatsreceiver/receiver_test.go
rename to receiver/dockerstatsreceiver/metrics_receiver_test.go
index c610668aace9..c9901e8595ab 100644
--- a/receiver/dockerstatsreceiver/receiver_test.go
+++ b/receiver/dockerstatsreceiver/metrics_receiver_test.go
@@ -30,8 +30,6 @@ import (
 	"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver/internal/metadata"
 )
 
-var mockFolder = filepath.Join("testdata", "mock")
-
 var (
 	metricEnabled     = metadata.MetricConfig{Enabled: true}
 	allMetricsEnabled = metadata.MetricsConfig{
@@ -120,7 +118,7 @@ var (
 	}
 )
 
-func TestNewReceiver(t *testing.T) {
+func TestNewMetricsReceiver(t *testing.T) {
 	cfg := &Config{
 		ControllerConfig: scraperhelper.ControllerConfig{
 			CollectionInterval: 1 * time.Second,
@@ -134,7 +132,7 @@ func TestNewReceiver(t *testing.T) {
 	assert.NotNil(t, mr)
 }
 
-func TestErrorsInStart(t *testing.T) {
+func TestErrorsInStartMetrics(t *testing.T) {
 	unreachable := "unix:///not/a/thing.sock"
 	cfg := &Config{
 		ControllerConfig: scraperhelper.ControllerConfig{
diff --git a/receiver/dockerstatsreceiver/testdata/config.yaml b/receiver/dockerstatsreceiver/testdata/config.yaml
index de3ffce662a7..566962a8bffe 100644
--- a/receiver/dockerstatsreceiver/testdata/config.yaml
+++ b/receiver/dockerstatsreceiver/testdata/config.yaml
@@ -18,3 +18,11 @@ docker_stats/allsettings:
       enabled: false
     container.memory.total_rss:
       enabled: true
+  min_docker_retry_wait: 1s
+  max_docker_retry_wait: 30s
+  logs:
+    filters:
+      type: [ "container", "image" ]
+      event: [ "start", "stop", "die" ]
+    since: "2024-01-01T00:00:00Z"
+    until: "2024-01-02T00:00:00Z"
\ No newline at end of file
diff --git a/receiver/dockerstatsreceiver/testdata/mock/single_container/events.json b/receiver/dockerstatsreceiver/testdata/mock/single_container/events.json
new file mode 100644
index 000000000000..6a45c2d56a45
--- /dev/null
+++ b/receiver/dockerstatsreceiver/testdata/mock/single_container/events.json
@@ -0,0 +1,2 @@
+{"status":"start","id":"f97ed5bca0a5a0b85bfd52c4144b96174e825c92a138bc0458f0e196f2c7c1b4","from":"alpine:latest","Type":"container","Action":"health_status: running","Actor":{"ID":"f97ed5bca0a5a0b85bfd52c4144b96174e825c92a138bc0458f0e196f2c7c1b4","Attributes":{"image":"alpine:latest","name":"test-container"}},"time":1699483576,"timeNano":1699483576081311000,"scope":"local"}
+{"status":"create","id":"8c0b5d75f8fb4c06b31f25e9d3c2702827d7a43c82a26c1538ddd5f7b3307d05","from":"","Type":"network","Action":"create","Actor":{"ID":"8c0b5d75f8fb4c06b31f25e9d3c2702827d7a43c82a26c1538ddd5f7b3307d05","Attributes":{"name":"test-network","type":"bridge","driver":"bridge"}},"time":1699483577,"timeNano":1699483577123456000,"scope":"swarm"}
\ No newline at end of file
diff --git a/receiver/dockerstatsreceiver/testdata/mock/single_container/events2.json b/receiver/dockerstatsreceiver/testdata/mock/single_container/events2.json
new file mode 100644
index 000000000000..fdb5a1d37ebe
--- /dev/null
+++ b/receiver/dockerstatsreceiver/testdata/mock/single_container/events2.json
@@ -0,0 +1,5 @@
+{"status":"destroy","id":"def456789","from":"","Type":"volume","Action":"destroy","Actor":{"ID":"def456789"},"time":1699483578,"timeNano":1699483578123456000}
+{"status":"start","id":"","from":"","Type":"daemon","Action":"start","Actor":{"ID":"","Attributes":{"name":"docker-daemon"}},"time":1699483579,"timeNano":1699483579123456000,"scope":"local"}
+not json (shouldn't error, just cause retries)
+test retry
+again
\ No newline at end of file