From 1148e38427d36764f147fb7a1fcd3a629c766209 Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Wed, 8 Jan 2025 17:09:25 +0100 Subject: [PATCH] feat(confgen): add bytes connector Signed-off-by: Szilard Parrag --- .../telemetry/otel_conf_gen/otel_conf_gen.go | 8 ++-- .../components/connector/bytes_connector.go | 45 +++++++++++++++++++ .../controller/telemetry/pipeline/pipeline.go | 6 +++ 3 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 internal/controller/telemetry/pipeline/components/connector/bytes_connector.go diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index 66bd737..c372773 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -162,6 +162,7 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any { func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { connectors := make(map[string]any) maps.Copy(connectors, connector.GenerateCountConnectors()) + maps.Copy(connectors, connector.GenerateBytesConnectors()) for _, tenant := range cfgInput.Tenants { // Generate routing connector for the tenant's subscription if it has any @@ -189,6 +190,7 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1beta1.Pipeline { const outputCountConnectorName = "count/output_metrics" + const outputBytesConnectorName = "bytes/exporter" var namedPipelines = make(map[string]*otelv1beta1.Pipeline) tenants := []string{} @@ -235,15 +237,15 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b var exporters []string if output.Output.Spec.OTLPGRPC != nil { - exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName} + exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName} } if output.Output.Spec.OTLPHTTP != nil { - exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName} + exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName} } if output.Output.Spec.Fluentforward != nil { - exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName} + exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName} } if cfgInput.Debug { exporters = append(exporters, "debug") diff --git a/internal/controller/telemetry/pipeline/components/connector/bytes_connector.go b/internal/controller/telemetry/pipeline/components/connector/bytes_connector.go new file mode 100644 index 0000000..6ff1b0a --- /dev/null +++ b/internal/controller/telemetry/pipeline/components/connector/bytes_connector.go @@ -0,0 +1,45 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connector + +import "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" + +type BytesConnectorAttributes struct { + Key *string `json:"key,omitempty"` + DefaultValue *string `json:"default_value,omitempty"` +} + +type BytesConnector struct { + Description string `json:"description,omitempty"` + Attributes []BytesConnectorAttributes `json:"attributes,omitempty"` +} + +func GenerateBytesConnectors() map[string]any { + bytesConnectors := make(map[string]any) + + bytesConnectors["bytes/exporter"] = map[string]any{ + "logs": map[string]BytesConnector{ + "otelcol_exporter_sent_log_records_bytes": { + Description: "Bytes of log records successfully sent to destination", + Attributes: []BytesConnectorAttributes{{ + Key: utils.ToPtr("exporter"), + }}, + }, + }, + } + + return bytesConnectors + +} diff --git a/internal/controller/telemetry/pipeline/pipeline.go b/internal/controller/telemetry/pipeline/pipeline.go index c8366d8..8dbd963 100644 --- a/internal/controller/telemetry/pipeline/pipeline.go +++ b/internal/controller/telemetry/pipeline/pipeline.go @@ -63,6 +63,12 @@ func GenerateMetricsPipelines() map[string]*otelv1beta1.Pipeline { Exporters: []string{"prometheus/message_metrics_exporter"}, } + metricsPipelines["metrics/output_bytes"] = &otelv1beta1.Pipeline{ + Receivers: []string{"bytes/exporter"}, + Processors: []string{"deltatocumulative", "attributes/metricattributes"}, + Exporters: []string{"prometheus/message_metrics_exporter"}, + } + return metricsPipelines }