Skip to content

Commit

Permalink
feat: enable excluding collection from pods
Browse files Browse the repository at this point in the history
Signed-off-by: Bence Csati <[email protected]>
  • Loading branch information
csatib02 committed Jan 7, 2025
1 parent 59d42e4 commit 7636f52
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 8 deletions.
284 changes: 284 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
connectors:
count/output_metrics:
logs:
telemetry_controller_output_log_count:
attributes:
- key: tenant
- key: subscription
- key: exporter
description: The number of logs sent out from each exporter.
resource_attributes:
- key: k8s.namespace.name
- key: k8s.node.name
- key: k8s.container.name
- key: k8s.pod.name
- key: k8s.pod.labels.app.kubernetes.io/name
- key: k8s.pod.labels.app
count/tenant_metrics:
logs:
telemetry_controller_tenant_log_count:
attributes:
- key: tenant
description: The number of logs from each tenant pipeline.
resource_attributes:
- key: k8s.namespace.name
- key: k8s.node.name
- key: k8s.container.name
- key: k8s.pod.name
- key: k8s.pod.labels.app.kubernetes.io/name
- key: k8s.pod.labels.app
routing/subscription_example-tenant-ns_subscription-sample-1_outputs:
table:
- condition: "true"
pipelines:
- logs/output_example-tenant-ns_subscription-sample-1_collector_otlp-test-output
routing/subscription_example-tenant-ns_subscription-sample-2_outputs:
table:
- condition: "true"
pipelines:
- logs/output_example-tenant-ns_subscription-sample-2_collector_otlp-test-output-2
routing/tenant_example-tenant_subscriptions:
table:
- condition: "true"
pipelines:
- logs/tenant_example-tenant_subscription_example-tenant-ns_subscription-sample-1
- logs/tenant_example-tenant_subscription_example-tenant-ns_subscription-sample-2
exporters:
otlp/collector_otlp-test-output:
endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317
retry_on_failure:
enabled: true
max_elapsed_time: 0
sending_queue:
enabled: true
queue_size: 100
tls:
insecure: true
otlp/collector_otlp-test-output-2:
endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317
retry_on_failure:
enabled: true
max_elapsed_time: 0
sending_queue:
enabled: true
queue_size: 100
tls:
insecure: true
prometheus/message_metrics_exporter:
endpoint: :9999
extensions: {}
processors:
attributes/exporter_name_otlp-test-output:
actions:
- action: insert
key: exporter
value: otlp/collector_otlp-test-output
attributes/exporter_name_otlp-test-output-2:
actions:
- action: insert
key: exporter
value: otlp/collector_otlp-test-output-2
attributes/metricattributes:
actions:
- action: insert
from_attribute: k8s.pod.labels.app
key: app
- action: insert
from_attribute: k8s.node.name
key: host
- action: insert
from_attribute: k8s.namespace.name
key: namespace
- action: insert
from_attribute: k8s.container.name
key: container
- action: insert
from_attribute: k8s.pod.name
key: pod
attributes/subscription_subscription-sample-1:
actions:
- action: insert
key: subscription
value: subscription-sample-1
attributes/subscription_subscription-sample-2:
actions:
- action: insert
key: subscription
value: subscription-sample-2
attributes/tenant_example-tenant:
actions:
- action: insert
key: tenant
value: example-tenant
deltatocumulative: {}
filter/exclude:
error_mode: ignore
logs:
log_record:
- resource.attributes["exclude"] == "true"
k8sattributes:
auth_type: serviceAccount
extract:
annotations:
- from: pod
key: telemetry.kube-logging.dev/exclude
key_regex: null
tag_name: exclude
labels:
- from: pod
key: null
key_regex: .*
tag_name: all_labels
metadata:
- k8s.pod.name
- k8s.pod.uid
- k8s.deployment.name
- k8s.namespace.name
- k8s.node.name
- k8s.pod.start_time
passthrough: false
pod_association:
- sources:
- from: resource_attribute
name: k8s.namespace.name
- from: resource_attribute
name: k8s.pod.name
memory_limiter:
check_interval: 1s
limit_percentage: 75
spike_limit_mib: 25
receivers:
filelog/example-tenant:
exclude:
- /var/log/pods/*/otc-container/*.log
include:
- /var/log/pods/example-tenant-ns_*/*/*.log
include_file_name: false
include_file_path: true
operators:
- id: get-format
routes:
- expr: body matches "^\\{"
output: parser-docker
- expr: body matches "^[^ Z]+Z"
output: parser-containerd
type: router
- id: parser-containerd
output: extract_metadata_from_filepath
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
parse_from: attributes.time
type: regex_parser
- id: parser-docker
output: extract_metadata_from_filepath
timestamp:
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
parse_from: attributes.time
type: json_parser
- cache:
size: 128
id: extract_metadata_from_filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9-]+)\/(?P<container_name>[^\/]+)\/(?P<restart_count>\d+)\.log$
type: regex_parser
- from: attributes.log
to: body
type: move
- from: attributes.stream
to: attributes["log.iostream"]
type: move
- from: attributes.container_name
to: resource["k8s.container.name"]
type: move
- from: attributes.namespace
to: resource["k8s.namespace.name"]
type: move
- from: attributes.pod_name
to: resource["k8s.pod.name"]
type: move
- from: attributes.restart_count
to: resource["k8s.container.restart_count"]
type: move
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
retry_on_failure:
enabled: true
max_elapsed_time: 0
start_at: end
service:
pipelines:
logs/output_example-tenant-ns_subscription-sample-1_collector_otlp-test-output:
exporters:
- otlp/collector_otlp-test-output
- count/output_metrics
processors:
- memory_limiter
- attributes/exporter_name_otlp-test-output
receivers:
- routing/subscription_example-tenant-ns_subscription-sample-1_outputs
logs/output_example-tenant-ns_subscription-sample-2_collector_otlp-test-output-2:
exporters:
- otlp/collector_otlp-test-output-2
- count/output_metrics
processors:
- memory_limiter
- attributes/exporter_name_otlp-test-output-2
receivers:
- routing/subscription_example-tenant-ns_subscription-sample-2_outputs
logs/tenant_example-tenant:
exporters:
- routing/tenant_example-tenant_subscriptions
- count/tenant_metrics
processors:
- memory_limiter
- k8sattributes
- attributes/tenant_example-tenant
- filter/exclude
receivers:
- filelog/example-tenant
logs/tenant_example-tenant_subscription_example-tenant-ns_subscription-sample-1:
exporters:
- routing/subscription_example-tenant-ns_subscription-sample-1_outputs
processors:
- memory_limiter
- attributes/subscription_subscription-sample-1
receivers:
- routing/tenant_example-tenant_subscriptions
logs/tenant_example-tenant_subscription_example-tenant-ns_subscription-sample-2:
exporters:
- routing/subscription_example-tenant-ns_subscription-sample-2_outputs
processors:
- memory_limiter
- attributes/subscription_subscription-sample-2
receivers:
- routing/tenant_example-tenant_subscriptions
metrics/output:
exporters:
- prometheus/message_metrics_exporter
processors:
- memory_limiter
- deltatocumulative
- attributes/metricattributes
receivers:
- count/output_metrics
metrics/tenant:
exporters:
- prometheus/message_metrics_exporter
processors:
- memory_limiter
- deltatocumulative
- attributes/metricattributes
receivers:
- count/tenant_metrics
telemetry:
metrics:
address: 0.0.0.0:8888
level: detailed
readers:
- pull:
exporter:
prometheus:
host: ""
port: 8888
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,22 @@ processors:
send_batch_size: 512
timeout: 5s
deltatocumulative: {}
filter/exclude:
error_mode: ignore
logs:
log_record:
- resource.attributes["exclude-tc"] == "true"
- resource.attributes["exclude-fluentbit"] == "true"
k8sattributes:
auth_type: serviceAccount
extract:
annotations:
- from: pod
key: telemetry.kube-logging.dev/exclude
tag_name: exclude-tc
- from: pod
key: fluentbit.io/exclude
tag_name: exclude-fluentbit
labels:
- from: pod
key_regex: .*
Expand Down Expand Up @@ -223,6 +236,7 @@ service:
- memory_limiter
- k8sattributes
- attributes/tenant_example-tenant-a
- filter/exclude
receivers:
- filelog/example-tenant-a
logs/tenant_example-tenant-a_subscription_example-tenant-a-ns_subscription-example-1:
Expand All @@ -249,6 +263,7 @@ service:
- memory_limiter
- k8sattributes
- attributes/tenant_example-tenant-b
- filter/exclude
receivers:
- filelog/example-tenant-b
logs/tenant_example-tenant-b_subscription_example-tenant-b-ns_subscription-example-3:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (cfgInput *OtelColConfigInput) generateProcessors() map[string]any {
processors := make(map[string]any)
processors["k8sattributes"] = processor.GenerateDefaultKubernetesProcessor()
processors["memory_limiter"] = processor.GenerateProcessorMemoryLimiter(cfgInput.MemoryLimiter)
processors["filter/exclude"] = processor.GenerateFilterProcessor()
maps.Copy(processors, processor.GenerateMetricsProcessors())

for _, tenant := range cfgInput.Tenants {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,12 +664,12 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) {
expectedPipelines: map[string]*otelv1beta1.Pipeline{
"logs/tenant_tenant1": {
Receivers: []string{"filelog/tenant1"},
Processors: []string{"k8sattributes", "attributes/tenant_tenant1"},
Processors: []string{"k8sattributes", "attributes/tenant_tenant1", "filter/exclude"},
Exporters: []string{"count/tenant_metrics", "routing/bridge_bridge1", "routing/bridge_bridge2"},
},
"logs/tenant_tenant2": {
Receivers: []string{"routing/bridge_bridge1"},
Processors: []string{"k8sattributes", "attributes/tenant_tenant2"},
Processors: []string{"k8sattributes", "attributes/tenant_tenant2", "filter/exclude"},
Exporters: []string{"routing/tenant_tenant2_subscriptions", "count/tenant_metrics"},
},
"logs/tenant_tenant2_subscription_ns2_sub2": {
Expand All @@ -679,7 +679,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) {
},
"logs/tenant_tenant3": {
Receivers: []string{"routing/bridge_bridge2"},
Processors: []string{"k8sattributes", "attributes/tenant_tenant3"},
Processors: []string{"k8sattributes", "attributes/tenant_tenant3", "filter/exclude"},
Exporters: []string{"routing/tenant_tenant3_subscriptions", "count/tenant_metrics"},
},
"logs/tenant_tenant3_subscription_ns3_sub3": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright © 2024 Kube logging authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components"

func GenerateFilterProcessor() map[string]any {
logRecords := []string{
"resource.attributes[\"exclude-tc\"] == \"true\"",
"resource.attributes[\"exclude-fluentbit\"] == \"true\"",
}

filterProcessor := map[string]any{
"error_mode": components.ErrorModeIgnore,
"logs": map[string]any{
"log_record": logRecords,
},
}

return filterProcessor
}
Loading

0 comments on commit 7636f52

Please sign in to comment.