Skip to content

Commit

Permalink
CPU extractors with unit tests (amazon-contributing#146)
Browse files Browse the repository at this point in the history
* Add CPU extractors from kubelet summary API

1. Make cadvisor helper func's public to be used in k8swindows extractor
2. Add CPU extractor and add utilization fields
3. Add unit test for CPU extractor.
4. Add unit test data for kubelet summary API
5. Add helper func to convert Pod and Node summary stats to RawMetric

* Refactor code

1. Changed cExtractor to cextractors
2. Add nil checks to avoid panic during pointer deferences

* Refactored code

1. Added missing HasValue func in extractors
2. Replaced cextractor with cExtractor
3. Corrected extractorhelper name with missing characters
  • Loading branch information
KlwntSingh authored Dec 29, 2023
1 parent 4edef80 commit 1ad51be
Show file tree
Hide file tree
Showing 16 changed files with 706 additions and 37 deletions.
2 changes: 1 addition & 1 deletion receiver/awscontainerinsightreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ require (
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.100.1
k8s.io/kubelet v0.28.3
)

Expand Down Expand Up @@ -216,6 +215,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInf
metric := NewCadvisorMetric(containerType, c.logger)
metric.cgroupPath = info.Name
multiplier := float64(decimalToMillicores)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUUser), info.Name, float64(curStats.Cpu.Usage.User), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUSystem), info.Name, float64(curStats.Cpu.Usage.System), curStats.Timestamp, multiplier)
AssignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
AssignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUUser), info.Name, float64(curStats.Cpu.Usage.User), curStats.Timestamp, multiplier)
AssignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUSystem), info.Name, float64(curStats.Cpu.Usage.System), curStats.Timestamp, multiplier)

numCores := mInfo.GetNumCores()
if metric.fields[ci.MetricName(containerType, ci.CPUTotal)] != nil && numCores != 0 {
Expand All @@ -60,6 +60,6 @@ func (c *CPUMetricExtractor) Shutdown() error {
func NewCPUMetricExtractor(logger *zap.Logger) *CPUMetricExtractor {
return &CPUMetricExtractor{
logger: logger,
rateCalculator: newFloat64RateCalculator(),
rateCalculator: NewFloat64RateCalculator(),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (d *DiskIOMetricExtractor) extractIoMetrics(curStatsSet []cInfo.PerDiskStat
for _, key := range expectedKey {
if curVal, curOk := cur.Stats[key]; curOk {
mname := ci.MetricName(containerType, ioMetricName(namePrefix, key))
assignRateValueToField(&d.rateCalculator, metric.fields, mname, infoName, float64(curVal), curTime, float64(time.Second))
AssignRateValueToField(&d.rateCalculator, metric.fields, mname, infoName, float64(curVal), curTime, float64(time.Second))
}
}
if len(metric.fields) > 0 {
Expand Down Expand Up @@ -75,7 +75,7 @@ func devName(dStats cInfo.PerDiskStats) string {
func NewDiskIOMetricExtractor(logger *zap.Logger) *DiskIOMetricExtractor {
return &DiskIOMetricExtractor{
logger: logger,
rateCalculator: newFloat64RateCalculator(),
rateCalculator: NewFloat64RateCalculator(),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *CAdvisorMetric) Merge(src *CAdvisorMetric) {
}
}

func newFloat64RateCalculator() awsmetrics.MetricCalculator {
func NewFloat64RateCalculator() awsmetrics.MetricCalculator {
return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val any, timestamp time.Time) (any, bool) {
if prev != nil {
deltaNs := timestamp.Sub(prev.Timestamp)
Expand All @@ -127,7 +127,7 @@ func newFloat64RateCalculator() awsmetrics.MetricCalculator {
})
}

func assignRateValueToField(rateCalculator *awsmetrics.MetricCalculator, fields map[string]any, metricName string,
func AssignRateValueToField(rateCalculator *awsmetrics.MetricCalculator, fields map[string]any, metricName string,
cinfoName string, curVal any, curTime time.Time, multiplier float64) {
mKey := awsmetrics.NewKey(cinfoName+metricName, nil)
if val, ok := rateCalculator.Calculate(mKey, curVal, curTime); ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ func (m *MemMetricExtractor) GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInf
metric.fields[ci.MetricName(containerType, ci.MemWorkingset)] = curStats.Memory.WorkingSet

multiplier := float64(time.Second)
assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemPgfault), info.Name,
AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemPgfault), info.Name,
float64(curStats.Memory.ContainerData.Pgfault), curStats.Timestamp, multiplier)
assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemPgmajfault), info.Name,
AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemPgmajfault), info.Name,
float64(curStats.Memory.ContainerData.Pgmajfault), curStats.Timestamp, multiplier)
assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemHierarchicalPgfault), info.Name,
AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemHierarchicalPgfault), info.Name,
float64(curStats.Memory.HierarchicalData.Pgfault), curStats.Timestamp, multiplier)
assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemHierarchicalPgmajfault), info.Name,
AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemHierarchicalPgmajfault), info.Name,
float64(curStats.Memory.HierarchicalData.Pgmajfault), curStats.Timestamp, multiplier)
memoryFailuresTotal := curStats.Memory.ContainerData.Pgfault + curStats.Memory.ContainerData.Pgmajfault
assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemFailuresTotal), info.Name,
AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemFailuresTotal), info.Name,
float64(memoryFailuresTotal), curStats.Timestamp, multiplier)

memoryCapacity := mInfo.GetMemoryCapacity()
Expand All @@ -74,6 +74,6 @@ func (m *MemMetricExtractor) Shutdown() error {
func NewMemMetricExtractor(logger *zap.Logger) *MemMetricExtractor {
return &MemMetricExtractor{
logger: logger,
rateCalculator: newFloat64RateCalculator(),
rateCalculator: NewFloat64RateCalculator(),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro

infoName := info.Name + containerType + cur.Name // used to identify the network interface
multiplier := float64(time.Second)
assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxBytes, infoName, float64(cur.RxBytes), curStats.Timestamp, multiplier)
assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxPackets, infoName, float64(cur.RxPackets), curStats.Timestamp, multiplier)
assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxDropped, infoName, float64(cur.RxDropped), curStats.Timestamp, multiplier)
assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxErrors, infoName, float64(cur.RxErrors), curStats.Timestamp, multiplier)
assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxBytes, infoName, float64(cur.TxBytes), curStats.Timestamp, multiplier)
assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxPackets, infoName, float64(cur.TxPackets), curStats.Timestamp, multiplier)
assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxDropped, infoName, float64(cur.TxDropped), curStats.Timestamp, multiplier)
assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxErrors, infoName, float64(cur.TxErrors), curStats.Timestamp, multiplier)
AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxBytes, infoName, float64(cur.RxBytes), curStats.Timestamp, multiplier)
AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxPackets, infoName, float64(cur.RxPackets), curStats.Timestamp, multiplier)
AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxDropped, infoName, float64(cur.RxDropped), curStats.Timestamp, multiplier)
AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxErrors, infoName, float64(cur.RxErrors), curStats.Timestamp, multiplier)
AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxBytes, infoName, float64(cur.TxBytes), curStats.Timestamp, multiplier)
AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxPackets, infoName, float64(cur.TxPackets), curStats.Timestamp, multiplier)
AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxDropped, infoName, float64(cur.TxDropped), curStats.Timestamp, multiplier)
AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxErrors, infoName, float64(cur.TxErrors), curStats.Timestamp, multiplier)

if netIfceMetric[ci.NetRxBytes] != nil && netIfceMetric[ci.NetTxBytes] != nil {
netIfceMetric[ci.NetTotalBytes] = netIfceMetric[ci.NetRxBytes].(float64) + netIfceMetric[ci.NetTxBytes].(float64)
Expand Down Expand Up @@ -98,7 +98,7 @@ func (n *NetMetricExtractor) Shutdown() error {
func NewNetMetricExtractor(logger *zap.Logger) *NetMetricExtractor {
return &NetMetricExtractor{
logger: logger,
rateCalculator: newFloat64RateCalculator(),
rateCalculator: NewFloat64RateCalculator(),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package extractors

import (
ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"

"go.uber.org/zap"
)

const (
decimalToMillicores = 1000
)

type CPUMetricExtractor struct {
logger *zap.Logger
rateCalculator awsmetrics.MetricCalculator
}

func (c *CPUMetricExtractor) HasValue(rawMetric *RawMetric) bool {
if rawMetric.CPUStats != nil {
return true
}
return false
}

func (c *CPUMetricExtractor) GetValue(rawMetric *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric {
var metrics []*cExtractor.CAdvisorMetric

metric := cExtractor.NewCadvisorMetric(containerType, c.logger)

multiplier := float64(decimalToMillicores)
identifier := rawMetric.Id
cExtractor.AssignRateValueToField(&c.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.CPUTotal), identifier, float64(*rawMetric.CPUStats.UsageCoreNanoSeconds), rawMetric.Time, multiplier)

numCores := mInfo.GetNumCores()
if metric.GetField(ci.MetricName(containerType, ci.CPUTotal)) != nil && numCores != 0 {
metric.AddField(ci.MetricName(containerType, ci.CPUUtilization), metric.GetField(ci.MetricName(containerType, ci.CPUTotal)).(float64)/float64(numCores*decimalToMillicores)*100)
}

if containerType == ci.TypeNode {
metric.AddField(ci.MetricName(containerType, ci.CPULimit), numCores*decimalToMillicores)
}

metrics = append(metrics, metric)
return metrics
}

func (c *CPUMetricExtractor) Shutdown() error {
return c.rateCalculator.Shutdown()
}

func NewCPUMetricExtractor(logger *zap.Logger) *CPUMetricExtractor {
return &CPUMetricExtractor{
logger: logger,
rateCalculator: cExtractor.NewFloat64RateCalculator(),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package extractors

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
cTestUtils "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestCPUStats(t *testing.T) {
MockCPUMemInfo := cTestUtils.MockCPUMemInfo{}

result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")
result2 := testutils.LoadKubeletSummary(t, "./testdata/CurSingleKubeletSummary.json")

podRawMetric := ConvertPodToRaw(&result.Pods[0])
podRawMetric2 := ConvertPodToRaw(&result2.Pods[0])

// test container type
containerType := containerinsight.TypePod
extractor := NewCPUMetricExtractor(&zap.Logger{})

var cMetrics []*cExtractor.CAdvisorMetric
cMetrics = extractor.GetValue(podRawMetric, MockCPUMemInfo, containerType)
cMetrics = extractor.GetValue(podRawMetric2, MockCPUMemInfo, containerType)

cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_cpu_usage_total", 3.125000, 0)
cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_cpu_utilization", 0.156250, 0)
require.NoError(t, extractor.Shutdown())

// test node type
containerType = containerinsight.TypeNode
extractor = NewCPUMetricExtractor(nil)

nodeRawMetric := ConvertNodeToRaw(&result.Node)
nodeRawMetric2 := ConvertNodeToRaw(&result2.Node)
cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType)
cMetrics = extractor.GetValue(nodeRawMetric2, MockCPUMemInfo, containerType)

cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_cpu_usage_total", 51.5, 0.5)
cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_cpu_utilization", 2.5, 0.5)
cExtractor.AssertContainsTaggedInt(t, cMetrics[0], "node_cpu_limit", 2000)

require.NoError(t, extractor.Shutdown())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package extractors

import (
"time"

cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)

// RawMetric Represent Container, Pod, Node Metric Extractors.
// Kubelet summary and HNS stats will be converted to Raw Metric for parsing by Extractors.
type RawMetric struct {
Id string
Name string
Namespace string
Time time.Time
CPUStats *stats.CPUStats
MemoryStats *stats.MemoryStats
}

type MetricExtractor interface {
HasValue(summary *RawMetric) bool
GetValue(summary *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric
Shutdown() error
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package extractors

import (
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)

// ConvertPodToRaw Converts Kubelet Pod stats to RawMetric.
func ConvertPodToRaw(podStat *stats.PodStats) *RawMetric {
var rawMetic *RawMetric
rawMetic = &RawMetric{}
rawMetic.Id = podStat.PodRef.UID
rawMetic.Name = podStat.PodRef.Name
rawMetic.Namespace = podStat.PodRef.Namespace

if podStat.CPU != nil {
rawMetic.Time = podStat.CPU.Time.Time
rawMetic.CPUStats = podStat.CPU
}

if podStat.Memory != nil {
rawMetic.MemoryStats = podStat.Memory
}
return rawMetic
}

// ConvertNodeToRaw Converts Kubelet Node stats to RawMetric.
func ConvertNodeToRaw(nodeStat *stats.NodeStats) *RawMetric {
var rawMetic *RawMetric
rawMetic = &RawMetric{}
rawMetic.Id = nodeStat.NodeName
rawMetic.Name = nodeStat.NodeName

if nodeStat.CPU != nil {
rawMetic.Time = nodeStat.CPU.Time.Time
rawMetic.CPUStats = nodeStat.CPU
}

if nodeStat.Memory != nil {
rawMetic.MemoryStats = nodeStat.Memory
}

return rawMetic
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package extractors

import (
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils"

"github.com/stretchr/testify/assert"
)

func TestConvertPodToRaw(t *testing.T) {

result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")

podRawMetric := ConvertPodToRaw(&result.Pods[0])

assert.Equal(t, podRawMetric.Id, "01bfbe59-2925-4ad5-a8d3-a1b23e3ddd74")
assert.Equal(t, podRawMetric.Name, "windows-server-iis-ltsc2019-58d94b5844-6v2pg")
assert.Equal(t, podRawMetric.Namespace, "amazon-cloudwatch")
parsedtime, _ := time.Parse(time.RFC3339, "2023-12-21T15:19:59Z")
assert.Equal(t, podRawMetric.Time, parsedtime.Local())
assert.Equal(t, *podRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(289625000000))
assert.Equal(t, *podRawMetric.CPUStats.UsageNanoCores, uint64(0))
}

func TestConvertNodeToRaw(t *testing.T) {

result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")

nodeRawMetric := ConvertNodeToRaw(&result.Node)

assert.Equal(t, nodeRawMetric.Id, "ip-192-168-44-84.us-west-2.compute.internal")
assert.Equal(t, nodeRawMetric.Name, "ip-192-168-44-84.us-west-2.compute.internal")
assert.Equal(t, nodeRawMetric.Namespace, "")
parsedtime, _ := time.Parse(time.RFC3339, "2023-12-21T15:19:58Z")
assert.Equal(t, nodeRawMetric.Time, parsedtime.Local())
assert.Equal(t, *nodeRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(38907680000000))
assert.Equal(t, *nodeRawMetric.CPUStats.UsageNanoCores, uint64(20000000))
}
Loading

0 comments on commit 1ad51be

Please sign in to comment.