From 324926f6b81c515af3dce862442e76bdf7dcf30d Mon Sep 17 00:00:00 2001 From: Kulwant Singh Date: Tue, 23 Jan 2024 00:36:44 +0000 Subject: [PATCH] Add network metrics (#152) --- .../k8swindows/extractors/extractor.go | 10 ++ .../k8swindows/extractors/extractorhelpers.go | 39 ++++++++ .../k8swindows/extractors/net_extractor.go | 89 ++++++++++++++++++ .../extractors/net_extractor_test.go | 92 +++++++++++++++++++ .../testdata/CurSingleKubeletSummary.json | 7 -- .../internal/k8swindows/k8swindows.go | 21 +++-- 6 files changed, 242 insertions(+), 16 deletions(-) create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/net_extractor.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/net_extractor_test.go diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go index 13e412f9e946..8512892d9702 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go @@ -31,6 +31,15 @@ type FileSystemStat struct { UsedBytes uint64 } +type NetworkStat struct { + Time time.Time + Name string + RxBytes uint64 + RxErrors uint64 + TxBytes uint64 + TxErrors uint64 +} + // RawMetric Represent Container, Pod, Node Metric Extractors. // Kubelet summary and HNS stats will be converted to Raw Metric for parsing by Extractors. type RawMetric struct { @@ -41,6 +50,7 @@ type RawMetric struct { CPUStats CPUStat MemoryStats MemoryStat FileSystemStats []FileSystemStat + NetworkStats []NetworkStat } type MetricExtractor interface { diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go index d6beaaa932b9..8047a1b35310 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go @@ -69,6 +69,33 @@ func convertMemoryStats(kubeletMemoryStat stats.MemoryStats) MemoryStat { return memoryStat } +// convertNetworkStats Convert kubelet network system stats to Raw memory stats +func convertNetworkStats(kubeletNetworkStat stats.NetworkStats, kubeletIntfStat stats.InterfaceStats) NetworkStat { + var networkstat NetworkStat + + networkstat.Time = kubeletNetworkStat.Time.Time + + networkstat.Name = kubeletIntfStat.Name + + if kubeletIntfStat.TxBytes != nil { + networkstat.TxBytes = *kubeletIntfStat.TxBytes + } + + if kubeletIntfStat.TxErrors != nil { + networkstat.TxErrors = *kubeletIntfStat.TxErrors + } + + if kubeletIntfStat.RxBytes != nil { + networkstat.RxBytes = *kubeletIntfStat.RxBytes + } + + if kubeletIntfStat.RxErrors != nil { + networkstat.RxErrors = *kubeletIntfStat.RxErrors + } + + return networkstat +} + // ConvertPodToRaw Converts Kubelet Pod stats to RawMetric. func ConvertPodToRaw(podStat stats.PodStats) RawMetric { var rawMetic RawMetric @@ -89,6 +116,12 @@ func ConvertPodToRaw(podStat stats.PodStats) RawMetric { rawMetic.MemoryStats = convertMemoryStats(*podStat.Memory) } + if podStat.Network != nil { + for _, intfStats := range podStat.Network.Interfaces { + rawMetic.NetworkStats = append(rawMetic.NetworkStats, convertNetworkStats(*podStat.Network, intfStats)) + } + } + return rawMetic } @@ -147,5 +180,11 @@ func ConvertNodeToRaw(nodeStat stats.NodeStats) RawMetric { rawMetic.FileSystemStats = append(rawMetic.FileSystemStats, convertFileSystemStats(*nodeStat.Fs)) } + if nodeStat.Network != nil { + for _, intfStats := range nodeStat.Network.Interfaces { + rawMetic.NetworkStats = append(rawMetic.NetworkStats, convertNetworkStats(*nodeStat.Network, intfStats)) + } + } + return rawMetic } diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/net_extractor.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/net_extractor.go new file mode 100644 index 000000000000..4fe48580a50f --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/net_extractor.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extractors // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + +import ( + "time" + + "go.uber.org/zap" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" +) + +type NetMetricExtractor struct { + logger *zap.Logger + rateCalculator awsmetrics.MetricCalculator +} + +func (n *NetMetricExtractor) HasValue(rawMetric RawMetric) bool { + if !rawMetric.Time.IsZero() { + return true + } + return false +} + +func (n *NetMetricExtractor) GetValue(rawMetric RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric { + var metrics []*cExtractor.CAdvisorMetric + + if containerType == ci.TypeContainer { + return nil + } + + netIfceMetrics := make([]map[string]any, len(rawMetric.NetworkStats)) + + for i, intf := range rawMetric.NetworkStats { + netIfceMetric := make(map[string]any) + + identifier := rawMetric.Id + containerType + intf.Name + multiplier := float64(time.Second) + + cExtractor.AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxBytes, identifier, float64(intf.RxBytes), rawMetric.Time, multiplier) + cExtractor.AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxErrors, identifier, float64(intf.RxErrors), rawMetric.Time, multiplier) + cExtractor.AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxBytes, identifier, float64(intf.TxBytes), rawMetric.Time, multiplier) + cExtractor.AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxErrors, identifier, float64(intf.TxErrors), rawMetric.Time, multiplier) + + if netIfceMetric[ci.NetRxBytes] != nil && netIfceMetric[ci.NetTxBytes] != nil { + netIfceMetric[ci.NetTotalBytes] = netIfceMetric[ci.NetRxBytes].(float64) + netIfceMetric[ci.NetTxBytes].(float64) + } + + netIfceMetrics[i] = netIfceMetric + } + + aggregatedFields := ci.SumFields(netIfceMetrics) + if len(aggregatedFields) > 0 { + metric := cExtractor.NewCadvisorMetric(containerType, n.logger) + for k, v := range aggregatedFields { + metric.AddField(ci.MetricName(containerType, k), v) + } + metrics = append(metrics, metric) + } + + return metrics +} + +func (n *NetMetricExtractor) Shutdown() error { + return n.rateCalculator.Shutdown() +} + +func NewNetMetricExtractor(logger *zap.Logger) *NetMetricExtractor { + return &NetMetricExtractor{ + logger: logger, + rateCalculator: cExtractor.NewFloat64RateCalculator(), + } +} + +func getNetMetricType(containerType string, logger *zap.Logger) string { + metricType := "" + switch containerType { + case ci.TypeNode: + metricType = ci.TypeNodeNet + case ci.TypePod: + metricType = ci.TypePodNet + default: + logger.Warn("net_extractor: net metric extractor is parsing unexpected containerType", zap.String("containerType", containerType)) + } + return metricType +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/net_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/net_extractor_test.go new file mode 100644 index 000000000000..0d9298b89926 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/net_extractor_test.go @@ -0,0 +1,92 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extractors + +import ( + "testing" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNetStats(t *testing.T) { + + result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json") + result2 := testutils.LoadKubeletSummary(t, "./testdata/CurSingleKubeletSummary.json") + + nodeRawMetric := ConvertNodeToRaw(result.Node) + nodeRawMetric2 := ConvertNodeToRaw(result2.Node) + + containerType := ci.TypeNode + extractor := NewNetMetricExtractor(nil) + var cMetrics []*cExtractor.CAdvisorMetric + if extractor.HasValue(nodeRawMetric) { + cMetrics = extractor.GetValue(nodeRawMetric, nil, containerType) + } + if extractor.HasValue(nodeRawMetric2) { + cMetrics = extractor.GetValue(nodeRawMetric2, nil, containerType) + } + + expectedFields := []map[string]any{ + { + "node_network_rx_bytes": float64(5768.366666666667), + "node_network_rx_errors": float64(0), + "node_network_total_bytes": float64(10259), + "node_network_tx_bytes": float64(4490.633333333333), + "node_network_tx_errors": float64(0), + }, + } + + expectedTags := []map[string]string{ + { + "Type": "Node", + }, + } + + assert.Equal(t, len(cMetrics), 1) + for i := range expectedFields { + cExtractor.AssertContainsTaggedField(t, cMetrics[i], expectedFields[i], expectedTags[i]) + } + require.NoError(t, extractor.Shutdown()) + + // pod type metrics + podRawMetric := ConvertPodToRaw(result.Pods[0]) + podRawMetric2 := ConvertPodToRaw(result2.Pods[0]) + + containerType = ci.TypePod + extractor = NewNetMetricExtractor(nil) + + if extractor.HasValue(podRawMetric) { + cMetrics = extractor.GetValue(podRawMetric, nil, containerType) + } + if extractor.HasValue(podRawMetric2) { + cMetrics = extractor.GetValue(podRawMetric2, nil, containerType) + } + + expectedFields = []map[string]any{ + { + "pod_network_rx_bytes": float64(1735.9333333333334), + "pod_network_rx_errors": float64(0), + "pod_network_total_bytes": float64(1903.75), + "pod_network_tx_bytes": float64(167.81666666666666), + "pod_network_tx_errors": float64(0), + }, + } + + expectedTags = []map[string]string{ + { + "Type": "Pod", + }, + } + + assert.Equal(t, len(cMetrics), 1) + for i := range expectedFields { + cExtractor.AssertContainsTaggedField(t, cMetrics[i], expectedFields[i], expectedTags[i]) + } + require.NoError(t, extractor.Shutdown()) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/CurSingleKubeletSummary.json b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/CurSingleKubeletSummary.json index 650af9ca94d0..e177d22789c0 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/CurSingleKubeletSummary.json +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/CurSingleKubeletSummary.json @@ -62,13 +62,6 @@ "txBytes": 0, "txErrors": 0 }, - { - "name": "Intel[R] 82599 Virtual Function", - "rxBytes": 0, - "rxErrors": 0, - "txBytes": 0, - "txErrors": 0 - }, { "name": "Microsoft IP-HTTPS Platform Interface", "rxBytes": 0, diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go index 3f7fd0ce7cf1..5f21f83b5e76 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go @@ -29,6 +29,7 @@ type K8sWindows struct { k8sDecorator stores.K8sDecorator kubeletSummaryProvider *kubeletsummaryprovider.SummaryProvider hostInfo host.Info + version string } var metricsExtractors = []extractors.MetricExtractor{} @@ -43,6 +44,7 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) metricsExtractors = append(metricsExtractors, extractors.NewCPUMetricExtractor(logger)) metricsExtractors = append(metricsExtractors, extractors.NewMemMetricExtractor(logger)) metricsExtractors = append(metricsExtractors, extractors.NewFileSystemMetricExtractor(logger)) + metricsExtractors = append(metricsExtractors, extractors.NewNetMetricExtractor(logger)) ksp, err := kubeletsummaryprovider.New(logger, &hostInfo, metricsExtractors) if err != nil { @@ -56,6 +58,7 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) k8sDecorator: *decorator, kubeletSummaryProvider: ksp, hostInfo: hostInfo, + version: "0", }, nil } @@ -78,7 +81,7 @@ func (k *K8sWindows) GetMetrics() []pmetric.Metrics { return result } -func (c *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetric) []*cExtractor.CAdvisorMetric { +func (k *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetric) []*cExtractor.CAdvisorMetric { //ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes() var result []*cExtractor.CAdvisorMetric for _, m := range cadvisormetrics { @@ -86,33 +89,33 @@ func (c *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetri //c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV) // add version - //tags[ci.Version] = c.version + tags[ci.Version] = k.version // add nodeName for node, pod and container metricType := tags[ci.MetricType] - if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) || + if k.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) || ci.IsPod(metricType) || ci.IsContainer(metricType)) { - tags[ci.NodeNameKey] = c.nodeName + tags[ci.NodeNameKey] = k.nodeName } // add instance id and type - if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" { + if instanceID := k.hostInfo.GetInstanceID(); instanceID != "" { tags[ci.InstanceID] = instanceID } - if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" { + if instanceType := k.hostInfo.GetInstanceType(); instanceType != "" { tags[ci.InstanceType] = instanceType } // add scaling group name - tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName() + tags[ci.AutoScalingGroupNameKey] = k.hostInfo.GetAutoScalingGroupName() // add tags for EKS - tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName() + tags[ci.ClusterNameKey] = k.hostInfo.GetClusterName() // add tags for OS tags[ci.OperatingSystem] = "windows" - out := c.k8sDecorator.Decorate(m) + out := k.k8sDecorator.Decorate(m) if out != nil { result = append(result, out) }