Skip to content

Commit

Permalink
Add network metrics (amazon-contributing#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
KlwntSingh authored Jan 23, 2024
1 parent 52bd7cc commit 324926f
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ type FileSystemStat struct {
UsedBytes uint64
}

type NetworkStat struct {
Time time.Time
Name string
RxBytes uint64
RxErrors uint64
TxBytes uint64
TxErrors uint64
}

// RawMetric Represent Container, Pod, Node Metric Extractors.
// Kubelet summary and HNS stats will be converted to Raw Metric for parsing by Extractors.
type RawMetric struct {
Expand All @@ -41,6 +50,7 @@ type RawMetric struct {
CPUStats CPUStat
MemoryStats MemoryStat
FileSystemStats []FileSystemStat
NetworkStats []NetworkStat
}

type MetricExtractor interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ func convertMemoryStats(kubeletMemoryStat stats.MemoryStats) MemoryStat {
return memoryStat
}

// convertNetworkStats Convert kubelet network system stats to Raw memory stats
func convertNetworkStats(kubeletNetworkStat stats.NetworkStats, kubeletIntfStat stats.InterfaceStats) NetworkStat {
var networkstat NetworkStat

networkstat.Time = kubeletNetworkStat.Time.Time

networkstat.Name = kubeletIntfStat.Name

if kubeletIntfStat.TxBytes != nil {
networkstat.TxBytes = *kubeletIntfStat.TxBytes
}

if kubeletIntfStat.TxErrors != nil {
networkstat.TxErrors = *kubeletIntfStat.TxErrors
}

if kubeletIntfStat.RxBytes != nil {
networkstat.RxBytes = *kubeletIntfStat.RxBytes
}

if kubeletIntfStat.RxErrors != nil {
networkstat.RxErrors = *kubeletIntfStat.RxErrors
}

return networkstat
}

// ConvertPodToRaw Converts Kubelet Pod stats to RawMetric.
func ConvertPodToRaw(podStat stats.PodStats) RawMetric {
var rawMetic RawMetric
Expand All @@ -89,6 +116,12 @@ func ConvertPodToRaw(podStat stats.PodStats) RawMetric {
rawMetic.MemoryStats = convertMemoryStats(*podStat.Memory)
}

if podStat.Network != nil {
for _, intfStats := range podStat.Network.Interfaces {
rawMetic.NetworkStats = append(rawMetic.NetworkStats, convertNetworkStats(*podStat.Network, intfStats))
}
}

return rawMetic
}

Expand Down Expand Up @@ -147,5 +180,11 @@ func ConvertNodeToRaw(nodeStat stats.NodeStats) RawMetric {
rawMetic.FileSystemStats = append(rawMetic.FileSystemStats, convertFileSystemStats(*nodeStat.Fs))
}

if nodeStat.Network != nil {
for _, intfStats := range nodeStat.Network.Interfaces {
rawMetic.NetworkStats = append(rawMetic.NetworkStats, convertNetworkStats(*nodeStat.Network, intfStats))
}
}

return rawMetic
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package extractors // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"

import (
"time"

"go.uber.org/zap"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
)

type NetMetricExtractor struct {
logger *zap.Logger
rateCalculator awsmetrics.MetricCalculator
}

func (n *NetMetricExtractor) HasValue(rawMetric RawMetric) bool {
if !rawMetric.Time.IsZero() {
return true
}
return false
}

func (n *NetMetricExtractor) GetValue(rawMetric RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric {
var metrics []*cExtractor.CAdvisorMetric

if containerType == ci.TypeContainer {
return nil
}

netIfceMetrics := make([]map[string]any, len(rawMetric.NetworkStats))

for i, intf := range rawMetric.NetworkStats {
netIfceMetric := make(map[string]any)

identifier := rawMetric.Id + containerType + intf.Name
multiplier := float64(time.Second)

cExtractor.AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxBytes, identifier, float64(intf.RxBytes), rawMetric.Time, multiplier)
cExtractor.AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxErrors, identifier, float64(intf.RxErrors), rawMetric.Time, multiplier)
cExtractor.AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxBytes, identifier, float64(intf.TxBytes), rawMetric.Time, multiplier)
cExtractor.AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxErrors, identifier, float64(intf.TxErrors), rawMetric.Time, multiplier)

if netIfceMetric[ci.NetRxBytes] != nil && netIfceMetric[ci.NetTxBytes] != nil {
netIfceMetric[ci.NetTotalBytes] = netIfceMetric[ci.NetRxBytes].(float64) + netIfceMetric[ci.NetTxBytes].(float64)
}

netIfceMetrics[i] = netIfceMetric
}

aggregatedFields := ci.SumFields(netIfceMetrics)
if len(aggregatedFields) > 0 {
metric := cExtractor.NewCadvisorMetric(containerType, n.logger)
for k, v := range aggregatedFields {
metric.AddField(ci.MetricName(containerType, k), v)
}
metrics = append(metrics, metric)
}

return metrics
}

func (n *NetMetricExtractor) Shutdown() error {
return n.rateCalculator.Shutdown()
}

func NewNetMetricExtractor(logger *zap.Logger) *NetMetricExtractor {
return &NetMetricExtractor{
logger: logger,
rateCalculator: cExtractor.NewFloat64RateCalculator(),
}
}

func getNetMetricType(containerType string, logger *zap.Logger) string {
metricType := ""
switch containerType {
case ci.TypeNode:
metricType = ci.TypeNodeNet
case ci.TypePod:
metricType = ci.TypePodNet
default:
logger.Warn("net_extractor: net metric extractor is parsing unexpected containerType", zap.String("containerType", containerType))
}
return metricType
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package extractors

import (
"testing"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNetStats(t *testing.T) {

result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")
result2 := testutils.LoadKubeletSummary(t, "./testdata/CurSingleKubeletSummary.json")

nodeRawMetric := ConvertNodeToRaw(result.Node)
nodeRawMetric2 := ConvertNodeToRaw(result2.Node)

containerType := ci.TypeNode
extractor := NewNetMetricExtractor(nil)
var cMetrics []*cExtractor.CAdvisorMetric
if extractor.HasValue(nodeRawMetric) {
cMetrics = extractor.GetValue(nodeRawMetric, nil, containerType)
}
if extractor.HasValue(nodeRawMetric2) {
cMetrics = extractor.GetValue(nodeRawMetric2, nil, containerType)
}

expectedFields := []map[string]any{
{
"node_network_rx_bytes": float64(5768.366666666667),
"node_network_rx_errors": float64(0),
"node_network_total_bytes": float64(10259),
"node_network_tx_bytes": float64(4490.633333333333),
"node_network_tx_errors": float64(0),
},
}

expectedTags := []map[string]string{
{
"Type": "Node",
},
}

assert.Equal(t, len(cMetrics), 1)
for i := range expectedFields {
cExtractor.AssertContainsTaggedField(t, cMetrics[i], expectedFields[i], expectedTags[i])
}
require.NoError(t, extractor.Shutdown())

// pod type metrics
podRawMetric := ConvertPodToRaw(result.Pods[0])
podRawMetric2 := ConvertPodToRaw(result2.Pods[0])

containerType = ci.TypePod
extractor = NewNetMetricExtractor(nil)

if extractor.HasValue(podRawMetric) {
cMetrics = extractor.GetValue(podRawMetric, nil, containerType)
}
if extractor.HasValue(podRawMetric2) {
cMetrics = extractor.GetValue(podRawMetric2, nil, containerType)
}

expectedFields = []map[string]any{
{
"pod_network_rx_bytes": float64(1735.9333333333334),
"pod_network_rx_errors": float64(0),
"pod_network_total_bytes": float64(1903.75),
"pod_network_tx_bytes": float64(167.81666666666666),
"pod_network_tx_errors": float64(0),
},
}

expectedTags = []map[string]string{
{
"Type": "Pod",
},
}

assert.Equal(t, len(cMetrics), 1)
for i := range expectedFields {
cExtractor.AssertContainsTaggedField(t, cMetrics[i], expectedFields[i], expectedTags[i])
}
require.NoError(t, extractor.Shutdown())
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,6 @@
"txBytes": 0,
"txErrors": 0
},
{
"name": "Intel[R] 82599 Virtual Function",
"rxBytes": 0,
"rxErrors": 0,
"txBytes": 0,
"txErrors": 0
},
{
"name": "Microsoft IP-HTTPS Platform Interface",
"rxBytes": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type K8sWindows struct {
k8sDecorator stores.K8sDecorator
kubeletSummaryProvider *kubeletsummaryprovider.SummaryProvider
hostInfo host.Info
version string
}

var metricsExtractors = []extractors.MetricExtractor{}
Expand All @@ -43,6 +44,7 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info)
metricsExtractors = append(metricsExtractors, extractors.NewCPUMetricExtractor(logger))
metricsExtractors = append(metricsExtractors, extractors.NewMemMetricExtractor(logger))
metricsExtractors = append(metricsExtractors, extractors.NewFileSystemMetricExtractor(logger))
metricsExtractors = append(metricsExtractors, extractors.NewNetMetricExtractor(logger))

ksp, err := kubeletsummaryprovider.New(logger, &hostInfo, metricsExtractors)
if err != nil {
Expand All @@ -56,6 +58,7 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info)
k8sDecorator: *decorator,
kubeletSummaryProvider: ksp,
hostInfo: hostInfo,
version: "0",
}, nil
}

Expand All @@ -78,41 +81,41 @@ func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
return result
}

func (c *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetric) []*cExtractor.CAdvisorMetric {
func (k *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetric) []*cExtractor.CAdvisorMetric {
//ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes()
var result []*cExtractor.CAdvisorMetric
for _, m := range cadvisormetrics {
tags := m.GetTags()
//c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV)

// add version
//tags[ci.Version] = c.version
tags[ci.Version] = k.version

// add nodeName for node, pod and container
metricType := tags[ci.MetricType]
if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) ||
if k.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) ||
ci.IsPod(metricType) || ci.IsContainer(metricType)) {
tags[ci.NodeNameKey] = c.nodeName
tags[ci.NodeNameKey] = k.nodeName
}

// add instance id and type
if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" {
if instanceID := k.hostInfo.GetInstanceID(); instanceID != "" {
tags[ci.InstanceID] = instanceID
}
if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" {
if instanceType := k.hostInfo.GetInstanceType(); instanceType != "" {
tags[ci.InstanceType] = instanceType
}

// add scaling group name
tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName()
tags[ci.AutoScalingGroupNameKey] = k.hostInfo.GetAutoScalingGroupName()

// add tags for EKS
tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName()
tags[ci.ClusterNameKey] = k.hostInfo.GetClusterName()

// add tags for OS
tags[ci.OperatingSystem] = "windows"

out := c.k8sDecorator.Decorate(m)
out := k.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out)
}
Expand Down

0 comments on commit 324926f

Please sign in to comment.