From 1ad51be57f6bf1b987586bfa8024fc959bd2a622 Mon Sep 17 00:00:00 2001 From: Kulwant Singh Date: Fri, 29 Dec 2023 17:06:06 +0000 Subject: [PATCH] CPU extractors with unit tests (#146) * Add CPU extractors from kubelet summary API 1. Make cadvisor helper func's public to be used in k8swindows extractor 2. Add CPU extractor and add utilization fields 3. Add unit test for CPU extractor. 4. Add unit test data for kubelet summary API 5. Add helper func to convert Pod and Node summary stats to RawMetric * Refactor code 1. Changed cExtractor to cextractors 2. Add nil checks to avoid panic during pointer deferences * Refactored code 1. Added missing HasValue func in extractors 2. Replaced cextractor with cExtractor 3. Corrected extractorhelper name with missing characters --- receiver/awscontainerinsightreceiver/go.mod | 2 +- .../cadvisor/extractors/cpu_extractor.go | 8 +- .../cadvisor/extractors/diskio_extractor.go | 4 +- .../internal/cadvisor/extractors/extractor.go | 4 +- ...or_helpers_test.go => extractorhelpers.go} | 0 .../cadvisor/extractors/mem_extractor.go | 12 +- .../cadvisor/extractors/net_extractor.go | 18 +- .../k8swindows/extractors/cpu_extractor.go | 58 +++++ .../extractors/cpu_extractor_test.go | 50 ++++ .../k8swindows/extractors/extractor.go | 25 ++ .../k8swindows/extractors/extractorhelpers.go | 43 ++++ .../extractors/extractorhelpers_test.go | 40 ++++ .../testdata/CurSingleKubeletSummary.json | 213 ++++++++++++++++++ .../testdata/PreSingleKubeletSummary.json | 213 ++++++++++++++++++ .../internal/k8swindows/kubelet.go | 31 +-- .../internal/k8swindows/testutils/helpers.go | 22 ++ 16 files changed, 706 insertions(+), 37 deletions(-) rename receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/{extractor_helpers_test.go => extractorhelpers.go} (100%) create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor_test.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers_test.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/CurSingleKubeletSummary.json create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/PreSingleKubeletSummary.json create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/testutils/helpers.go diff --git a/receiver/awscontainerinsightreceiver/go.mod b/receiver/awscontainerinsightreceiver/go.mod index 8104c541d1e5..f1b75b124c2f 100644 --- a/receiver/awscontainerinsightreceiver/go.mod +++ b/receiver/awscontainerinsightreceiver/go.mod @@ -31,7 +31,6 @@ require ( k8s.io/apimachinery v0.28.3 k8s.io/client-go v0.28.3 k8s.io/klog v1.0.0 - k8s.io/klog/v2 v2.100.1 k8s.io/kubelet v0.28.3 ) @@ -216,6 +215,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go index 08369aa2be90..4d4ba6115734 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go @@ -36,9 +36,9 @@ func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInf metric := NewCadvisorMetric(containerType, c.logger) metric.cgroupPath = info.Name multiplier := float64(decimalToMillicores) - assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier) - assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUUser), info.Name, float64(curStats.Cpu.Usage.User), curStats.Timestamp, multiplier) - assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUSystem), info.Name, float64(curStats.Cpu.Usage.System), curStats.Timestamp, multiplier) + AssignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier) + AssignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUUser), info.Name, float64(curStats.Cpu.Usage.User), curStats.Timestamp, multiplier) + AssignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUSystem), info.Name, float64(curStats.Cpu.Usage.System), curStats.Timestamp, multiplier) numCores := mInfo.GetNumCores() if metric.fields[ci.MetricName(containerType, ci.CPUTotal)] != nil && numCores != 0 { @@ -60,6 +60,6 @@ func (c *CPUMetricExtractor) Shutdown() error { func NewCPUMetricExtractor(logger *zap.Logger) *CPUMetricExtractor { return &CPUMetricExtractor{ logger: logger, - rateCalculator: newFloat64RateCalculator(), + rateCalculator: NewFloat64RateCalculator(), } } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go index f3f3d5888c4c..c7a6210acec2 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go @@ -46,7 +46,7 @@ func (d *DiskIOMetricExtractor) extractIoMetrics(curStatsSet []cInfo.PerDiskStat for _, key := range expectedKey { if curVal, curOk := cur.Stats[key]; curOk { mname := ci.MetricName(containerType, ioMetricName(namePrefix, key)) - assignRateValueToField(&d.rateCalculator, metric.fields, mname, infoName, float64(curVal), curTime, float64(time.Second)) + AssignRateValueToField(&d.rateCalculator, metric.fields, mname, infoName, float64(curVal), curTime, float64(time.Second)) } } if len(metric.fields) > 0 { @@ -75,7 +75,7 @@ func devName(dStats cInfo.PerDiskStats) string { func NewDiskIOMetricExtractor(logger *zap.Logger) *DiskIOMetricExtractor { return &DiskIOMetricExtractor{ logger: logger, - rateCalculator: newFloat64RateCalculator(), + rateCalculator: NewFloat64RateCalculator(), } } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go index 457d5a3dbf46..8e0e47d2b575 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go @@ -114,7 +114,7 @@ func (c *CAdvisorMetric) Merge(src *CAdvisorMetric) { } } -func newFloat64RateCalculator() awsmetrics.MetricCalculator { +func NewFloat64RateCalculator() awsmetrics.MetricCalculator { return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val any, timestamp time.Time) (any, bool) { if prev != nil { deltaNs := timestamp.Sub(prev.Timestamp) @@ -127,7 +127,7 @@ func newFloat64RateCalculator() awsmetrics.MetricCalculator { }) } -func assignRateValueToField(rateCalculator *awsmetrics.MetricCalculator, fields map[string]any, metricName string, +func AssignRateValueToField(rateCalculator *awsmetrics.MetricCalculator, fields map[string]any, metricName string, cinfoName string, curVal any, curTime time.Time, multiplier float64) { mKey := awsmetrics.NewKey(cinfoName+metricName, nil) if val, ok := rateCalculator.Calculate(mKey, curVal, curTime); ok { diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor_helpers_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractorhelpers.go similarity index 100% rename from receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor_helpers_test.go rename to receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractorhelpers.go diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go index 310061282750..bda4e8dce905 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go @@ -42,16 +42,16 @@ func (m *MemMetricExtractor) GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInf metric.fields[ci.MetricName(containerType, ci.MemWorkingset)] = curStats.Memory.WorkingSet multiplier := float64(time.Second) - assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemPgfault), info.Name, + AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemPgfault), info.Name, float64(curStats.Memory.ContainerData.Pgfault), curStats.Timestamp, multiplier) - assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemPgmajfault), info.Name, + AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemPgmajfault), info.Name, float64(curStats.Memory.ContainerData.Pgmajfault), curStats.Timestamp, multiplier) - assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemHierarchicalPgfault), info.Name, + AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemHierarchicalPgfault), info.Name, float64(curStats.Memory.HierarchicalData.Pgfault), curStats.Timestamp, multiplier) - assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemHierarchicalPgmajfault), info.Name, + AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemHierarchicalPgmajfault), info.Name, float64(curStats.Memory.HierarchicalData.Pgmajfault), curStats.Timestamp, multiplier) memoryFailuresTotal := curStats.Memory.ContainerData.Pgfault + curStats.Memory.ContainerData.Pgmajfault - assignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemFailuresTotal), info.Name, + AssignRateValueToField(&m.rateCalculator, metric.fields, ci.MetricName(containerType, ci.MemFailuresTotal), info.Name, float64(memoryFailuresTotal), curStats.Timestamp, multiplier) memoryCapacity := mInfo.GetMemoryCapacity() @@ -74,6 +74,6 @@ func (m *MemMetricExtractor) Shutdown() error { func NewMemMetricExtractor(logger *zap.Logger) *MemMetricExtractor { return &MemMetricExtractor{ logger: logger, - rateCalculator: newFloat64RateCalculator(), + rateCalculator: NewFloat64RateCalculator(), } } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go index d8bdb23fcfe9..b2211cfe8a24 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go @@ -55,14 +55,14 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro infoName := info.Name + containerType + cur.Name // used to identify the network interface multiplier := float64(time.Second) - assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxBytes, infoName, float64(cur.RxBytes), curStats.Timestamp, multiplier) - assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxPackets, infoName, float64(cur.RxPackets), curStats.Timestamp, multiplier) - assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxDropped, infoName, float64(cur.RxDropped), curStats.Timestamp, multiplier) - assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxErrors, infoName, float64(cur.RxErrors), curStats.Timestamp, multiplier) - assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxBytes, infoName, float64(cur.TxBytes), curStats.Timestamp, multiplier) - assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxPackets, infoName, float64(cur.TxPackets), curStats.Timestamp, multiplier) - assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxDropped, infoName, float64(cur.TxDropped), curStats.Timestamp, multiplier) - assignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxErrors, infoName, float64(cur.TxErrors), curStats.Timestamp, multiplier) + AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxBytes, infoName, float64(cur.RxBytes), curStats.Timestamp, multiplier) + AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxPackets, infoName, float64(cur.RxPackets), curStats.Timestamp, multiplier) + AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxDropped, infoName, float64(cur.RxDropped), curStats.Timestamp, multiplier) + AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetRxErrors, infoName, float64(cur.RxErrors), curStats.Timestamp, multiplier) + AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxBytes, infoName, float64(cur.TxBytes), curStats.Timestamp, multiplier) + AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxPackets, infoName, float64(cur.TxPackets), curStats.Timestamp, multiplier) + AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxDropped, infoName, float64(cur.TxDropped), curStats.Timestamp, multiplier) + AssignRateValueToField(&n.rateCalculator, netIfceMetric, ci.NetTxErrors, infoName, float64(cur.TxErrors), curStats.Timestamp, multiplier) if netIfceMetric[ci.NetRxBytes] != nil && netIfceMetric[ci.NetTxBytes] != nil { netIfceMetric[ci.NetTotalBytes] = netIfceMetric[ci.NetRxBytes].(float64) + netIfceMetric[ci.NetTxBytes].(float64) @@ -98,7 +98,7 @@ func (n *NetMetricExtractor) Shutdown() error { func NewNetMetricExtractor(logger *zap.Logger) *NetMetricExtractor { return &NetMetricExtractor{ logger: logger, - rateCalculator: newFloat64RateCalculator(), + rateCalculator: NewFloat64RateCalculator(), } } diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor.go new file mode 100644 index 000000000000..105c5a50a266 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor.go @@ -0,0 +1,58 @@ +package extractors + +import ( + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + + "go.uber.org/zap" +) + +const ( + decimalToMillicores = 1000 +) + +type CPUMetricExtractor struct { + logger *zap.Logger + rateCalculator awsmetrics.MetricCalculator +} + +func (c *CPUMetricExtractor) HasValue(rawMetric *RawMetric) bool { + if rawMetric.CPUStats != nil { + return true + } + return false +} + +func (c *CPUMetricExtractor) GetValue(rawMetric *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric { + var metrics []*cExtractor.CAdvisorMetric + + metric := cExtractor.NewCadvisorMetric(containerType, c.logger) + + multiplier := float64(decimalToMillicores) + identifier := rawMetric.Id + cExtractor.AssignRateValueToField(&c.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.CPUTotal), identifier, float64(*rawMetric.CPUStats.UsageCoreNanoSeconds), rawMetric.Time, multiplier) + + numCores := mInfo.GetNumCores() + if metric.GetField(ci.MetricName(containerType, ci.CPUTotal)) != nil && numCores != 0 { + metric.AddField(ci.MetricName(containerType, ci.CPUUtilization), metric.GetField(ci.MetricName(containerType, ci.CPUTotal)).(float64)/float64(numCores*decimalToMillicores)*100) + } + + if containerType == ci.TypeNode { + metric.AddField(ci.MetricName(containerType, ci.CPULimit), numCores*decimalToMillicores) + } + + metrics = append(metrics, metric) + return metrics +} + +func (c *CPUMetricExtractor) Shutdown() error { + return c.rateCalculator.Shutdown() +} + +func NewCPUMetricExtractor(logger *zap.Logger) *CPUMetricExtractor { + return &CPUMetricExtractor{ + logger: logger, + rateCalculator: cExtractor.NewFloat64RateCalculator(), + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor_test.go new file mode 100644 index 000000000000..60a910f90d45 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/cpu_extractor_test.go @@ -0,0 +1,50 @@ +package extractors + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + cTestUtils "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestCPUStats(t *testing.T) { + MockCPUMemInfo := cTestUtils.MockCPUMemInfo{} + + result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json") + result2 := testutils.LoadKubeletSummary(t, "./testdata/CurSingleKubeletSummary.json") + + podRawMetric := ConvertPodToRaw(&result.Pods[0]) + podRawMetric2 := ConvertPodToRaw(&result2.Pods[0]) + + // test container type + containerType := containerinsight.TypePod + extractor := NewCPUMetricExtractor(&zap.Logger{}) + + var cMetrics []*cExtractor.CAdvisorMetric + cMetrics = extractor.GetValue(podRawMetric, MockCPUMemInfo, containerType) + cMetrics = extractor.GetValue(podRawMetric2, MockCPUMemInfo, containerType) + + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_cpu_usage_total", 3.125000, 0) + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_cpu_utilization", 0.156250, 0) + require.NoError(t, extractor.Shutdown()) + + // test node type + containerType = containerinsight.TypeNode + extractor = NewCPUMetricExtractor(nil) + + nodeRawMetric := ConvertNodeToRaw(&result.Node) + nodeRawMetric2 := ConvertNodeToRaw(&result2.Node) + cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType) + cMetrics = extractor.GetValue(nodeRawMetric2, MockCPUMemInfo, containerType) + + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_cpu_usage_total", 51.5, 0.5) + cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_cpu_utilization", 2.5, 0.5) + cExtractor.AssertContainsTaggedInt(t, cMetrics[0], "node_cpu_limit", 2000) + + require.NoError(t, extractor.Shutdown()) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go new file mode 100644 index 000000000000..315b0ae81d93 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go @@ -0,0 +1,25 @@ +package extractors + +import ( + "time" + + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" +) + +// RawMetric Represent Container, Pod, Node Metric Extractors. +// Kubelet summary and HNS stats will be converted to Raw Metric for parsing by Extractors. +type RawMetric struct { + Id string + Name string + Namespace string + Time time.Time + CPUStats *stats.CPUStats + MemoryStats *stats.MemoryStats +} + +type MetricExtractor interface { + HasValue(summary *RawMetric) bool + GetValue(summary *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric + Shutdown() error +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go new file mode 100644 index 000000000000..928d02d0b8c9 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go @@ -0,0 +1,43 @@ +package extractors + +import ( + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" +) + +// ConvertPodToRaw Converts Kubelet Pod stats to RawMetric. +func ConvertPodToRaw(podStat *stats.PodStats) *RawMetric { + var rawMetic *RawMetric + rawMetic = &RawMetric{} + rawMetic.Id = podStat.PodRef.UID + rawMetic.Name = podStat.PodRef.Name + rawMetic.Namespace = podStat.PodRef.Namespace + + if podStat.CPU != nil { + rawMetic.Time = podStat.CPU.Time.Time + rawMetic.CPUStats = podStat.CPU + } + + if podStat.Memory != nil { + rawMetic.MemoryStats = podStat.Memory + } + return rawMetic +} + +// ConvertNodeToRaw Converts Kubelet Node stats to RawMetric. +func ConvertNodeToRaw(nodeStat *stats.NodeStats) *RawMetric { + var rawMetic *RawMetric + rawMetic = &RawMetric{} + rawMetic.Id = nodeStat.NodeName + rawMetic.Name = nodeStat.NodeName + + if nodeStat.CPU != nil { + rawMetic.Time = nodeStat.CPU.Time.Time + rawMetic.CPUStats = nodeStat.CPU + } + + if nodeStat.Memory != nil { + rawMetic.MemoryStats = nodeStat.Memory + } + + return rawMetic +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers_test.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers_test.go new file mode 100644 index 000000000000..df715efc3427 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers_test.go @@ -0,0 +1,40 @@ +package extractors + +import ( + "testing" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils" + + "github.com/stretchr/testify/assert" +) + +func TestConvertPodToRaw(t *testing.T) { + + result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json") + + podRawMetric := ConvertPodToRaw(&result.Pods[0]) + + assert.Equal(t, podRawMetric.Id, "01bfbe59-2925-4ad5-a8d3-a1b23e3ddd74") + assert.Equal(t, podRawMetric.Name, "windows-server-iis-ltsc2019-58d94b5844-6v2pg") + assert.Equal(t, podRawMetric.Namespace, "amazon-cloudwatch") + parsedtime, _ := time.Parse(time.RFC3339, "2023-12-21T15:19:59Z") + assert.Equal(t, podRawMetric.Time, parsedtime.Local()) + assert.Equal(t, *podRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(289625000000)) + assert.Equal(t, *podRawMetric.CPUStats.UsageNanoCores, uint64(0)) +} + +func TestConvertNodeToRaw(t *testing.T) { + + result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json") + + nodeRawMetric := ConvertNodeToRaw(&result.Node) + + assert.Equal(t, nodeRawMetric.Id, "ip-192-168-44-84.us-west-2.compute.internal") + assert.Equal(t, nodeRawMetric.Name, "ip-192-168-44-84.us-west-2.compute.internal") + assert.Equal(t, nodeRawMetric.Namespace, "") + parsedtime, _ := time.Parse(time.RFC3339, "2023-12-21T15:19:58Z") + assert.Equal(t, nodeRawMetric.Time, parsedtime.Local()) + assert.Equal(t, *nodeRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(38907680000000)) + assert.Equal(t, *nodeRawMetric.CPUStats.UsageNanoCores, uint64(20000000)) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/CurSingleKubeletSummary.json b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/CurSingleKubeletSummary.json new file mode 100644 index 000000000000..650af9ca94d0 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/CurSingleKubeletSummary.json @@ -0,0 +1,213 @@ +{ + "node": { + "nodeName": "ip-192-168-44-84.us-west-2.compute.internal", + "systemContainers": [ + { + "name": "pods", + "startTime": "2023-12-21T15:20:59Z", + "cpu": { + "time": "2023-12-21T15:20:59Z", + "usageCoreNanoSeconds": 313109375000 + }, + "memory": { + "time": "2023-12-21T15:20:59Z", + "workingSetBytes": 316674048 + } + } + ], + "startTime": "2023-12-11T23:48:04Z", + "cpu": { + "time": "2023-12-21T15:20:58Z", + "usageNanoCores": 70000000, + "usageCoreNanoSeconds": 38910780000000 + }, + "memory": { + "time": "2023-12-21T15:20:58Z", + "availableBytes": 7248187392, + "usageBytes": 3572293632, + "workingSetBytes": 1026678784, + "rssBytes": 0, + "pageFaults": 0, + "majorPageFaults": 0 + }, + "network": { + "time": "2023-12-21T15:20:58Z", + "name": "", + "interfaces": [ + { + "name": "Amazon Elastic Network Adapter", + "rxBytes": 5620532878, + "rxErrors": 0, + "txBytes": 781180227, + "txErrors": 0 + }, + { + "name": "Hyper-V Virtual Ethernet Adapter", + "rxBytes": 5574303646, + "rxErrors": 0, + "txBytes": 773064167, + "txErrors": 0 + }, + { + "name": "Teredo Tunneling Pseudo-Interface", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "AWS PV Network Device", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Intel[R] 82599 Virtual Function", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Microsoft IP-HTTPS Platform Interface", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Hyper-V Virtual Switch Extension Adapter", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Microsoft Kernel Debug Network Adapter", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "6to4 Adapter", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + } + ] + }, + "fs": { + "time": "2023-12-21T15:20:58Z", + "availableBytes": 51230019584, + "capacityBytes": 85897244672, + "usedBytes": 34667225088 + }, + "runtime": { + "imageFs": { + "time": "2023-12-21T15:20:56Z", + "availableBytes": 51230019584, + "capacityBytes": 85897244672, + "usedBytes": 16901889814, + "inodesUsed": 0 + } + } + }, + "pods": [ + { + "podRef": { + "name": "windows-server-iis-ltsc2019-58d94b5844-6v2pg", + "namespace": "amazon-cloudwatch", + "uid": "01bfbe59-2925-4ad5-a8d3-a1b23e3ddd74" + }, + "startTime": "2023-12-20T16:52:55Z", + "containers": [ + { + "name": "windows-server-iis-ltsc2019", + "startTime": "2023-12-20T16:52:58Z", + "cpu": { + "time": "2023-12-21T15:20:59Z", + "usageNanoCores": 0, + "usageCoreNanoSeconds": 289812500000 + }, + "memory": { + "time": "2023-12-21T15:20:59Z", + "workingSetBytes": 209088512 + }, + "rootfs": { + "time": "2023-12-21T15:20:56Z", + "availableBytes": 51230019584, + "capacityBytes": 85897244672, + "usedBytes": 339738624, + "inodesUsed": 0 + }, + "logs": { + "time": "2023-12-21T15:20:59Z", + "availableBytes": 51230019584, + "capacityBytes": 85897244672, + "usedBytes": 920146, + "inodesUsed": 0 + } + } + ], + "cpu": { + "time": "2023-12-21T15:20:59Z", + "usageNanoCores": 0, + "usageCoreNanoSeconds": 289812500000 + }, + "memory": { + "time": "2023-12-21T15:20:59Z", + "availableBytes": 0, + "usageBytes": 0, + "workingSetBytes": 209088512, + "rssBytes": 0, + "pageFaults": 0, + "majorPageFaults": 0 + }, + "network": { + "time": "2023-12-21T15:20:59Z", + "name": "cid-ccecfbf3-5ecf-4d93-9229-3f2f0df1b6b5", + "rxBytes": 8891342, + "txBytes": 3433553, + "interfaces": [ + { + "name": "cid-ccecfbf3-5ecf-4d93-9229-3f2f0df1b6b5", + "rxBytes": 8891342, + "txBytes": 3433553 + } + ] + }, + "volume": [ + { + "time": "2023-12-21T15:20:56Z", + "availableBytes": 51230019584, + "capacityBytes": 85897244672, + "usedBytes": 4431, + "inodesFree": 0, + "inodes": 0, + "inodesUsed": 0, + "name": "cwagentconfig" + }, + { + "time": "2023-12-21T15:20:56Z", + "availableBytes": 51230019584, + "capacityBytes": 85897244672, + "usedBytes": 7448, + "inodesFree": 0, + "inodes": 0, + "inodesUsed": 0, + "name": "kube-api-access-8gtnx" + } + ], + "ephemeral-storage": { + "time": "2023-12-21T15:20:59Z", + "availableBytes": 51230019584, + "capacityBytes": 85897244672, + "usedBytes": 340663443, + "inodesUsed": 0 + } + } + ] +} \ No newline at end of file diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/PreSingleKubeletSummary.json b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/PreSingleKubeletSummary.json new file mode 100644 index 000000000000..6ef061851cf5 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/testdata/PreSingleKubeletSummary.json @@ -0,0 +1,213 @@ +{ + "node": { + "nodeName": "ip-192-168-44-84.us-west-2.compute.internal", + "systemContainers": [ + { + "name": "pods", + "startTime": "2023-12-21T15:19:59Z", + "cpu": { + "time": "2023-12-21T15:19:59Z", + "usageCoreNanoSeconds": 312859375000 + }, + "memory": { + "time": "2023-12-21T15:19:59Z", + "workingSetBytes": 316026880 + } + } + ], + "startTime": "2023-12-11T23:48:04Z", + "cpu": { + "time": "2023-12-21T15:19:58Z", + "usageNanoCores": 20000000, + "usageCoreNanoSeconds": 38907680000000 + }, + "memory": { + "time": "2023-12-21T15:19:58Z", + "availableBytes": 7234662400, + "usageBytes": 3583389696, + "workingSetBytes": 1040203776, + "rssBytes": 0, + "pageFaults": 0, + "majorPageFaults": 0 + }, + "network": { + "time": "2023-12-21T15:19:58Z", + "name": "", + "interfaces": [ + { + "name": "Hyper-V Virtual Switch Extension Adapter", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Microsoft Kernel Debug Network Adapter", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "6to4 Adapter", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Intel[R]82599VirtualFunction", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Microsoft IP-HTTPS Platform Interface", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Hyper-V Virtual Ethernet Adapter", + "rxBytes": 5574166022, + "rxErrors": 0, + "txBytes": 772914904, + "txErrors": 0 + }, + { + "name": "Teredo Tunneling Pseudo-Interface", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "AWS PV Network Device", + "rxBytes": 0, + "rxErrors": 0, + "txBytes": 0, + "txErrors": 0 + }, + { + "name": "Amazon Elastic Network Adapter", + "rxBytes": 5620324400, + "rxErrors": 0, + "txBytes": 781060052, + "txErrors": 0 + } + ] + }, + "fs": { + "time": "2023-12-21T15:19:58Z", + "availableBytes": 51230154752, + "capacityBytes": 85897244672, + "usedBytes": 34667089920 + }, + "runtime": { + "imageFs": { + "time": "2023-12-21T15:19:56Z", + "availableBytes": 51230154752, + "capacityBytes": 85897244672, + "usedBytes": 16901889814, + "inodesUsed": 0 + } + } + }, + "pods": [ + { + "podRef": { + "name": "windows-server-iis-ltsc2019-58d94b5844-6v2pg", + "namespace": "amazon-cloudwatch", + "uid": "01bfbe59-2925-4ad5-a8d3-a1b23e3ddd74" + }, + "startTime": "2023-12-20T16:52:55Z", + "containers": [ + { + "name": "windows-server-iis-ltsc2019", + "startTime": "2023-12-20T16:52:58Z", + "cpu": { + "time": "2023-12-21T15:19:59Z", + "usageNanoCores": 0, + "usageCoreNanoSeconds": 289625000000 + }, + "memory": { + "time": "2023-12-21T15:19:59Z", + "workingSetBytes": 208949248 + }, + "rootfs": { + "time": "2023-12-21T15:19:56Z", + "availableBytes": 51230154752, + "capacityBytes": 85897244672, + "usedBytes": 339738624, + "inodesUsed": 0 + }, + "logs": { + "time": "2023-12-21T15:19:59Z", + "availableBytes": 51230154752, + "capacityBytes": 85897244672, + "usedBytes": 919463, + "inodesUsed": 0 + } + } + ], + "cpu": { + "time": "2023-12-21T15:19:59Z", + "usageNanoCores": 0, + "usageCoreNanoSeconds": 289625000000 + }, + "memory": { + "time": "2023-12-21T15:19:59Z", + "availableBytes": 0, + "usageBytes": 0, + "workingSetBytes": 208949248, + "rssBytes": 0, + "pageFaults": 0, + "majorPageFaults": 0 + }, + "network": { + "time": "2023-12-21T15:19:59Z", + "name": "cid-ccecfbf3-5ecf-4d93-9229-3f2f0df1b6b5", + "rxBytes": 8787186, + "txBytes": 3423484, + "interfaces": [ + { + "name": "cid-ccecfbf3-5ecf-4d93-9229-3f2f0df1b6b5", + "rxBytes": 8787186, + "txBytes": 3423484 + } + ] + }, + "volume": [ + { + "time": "2023-12-21T15:19:04Z", + "availableBytes": 51230248960, + "capacityBytes": 85897244672, + "usedBytes": 4431, + "inodesFree": 0, + "inodes": 0, + "inodesUsed": 0, + "name": "cwagentconfig" + }, + { + "time": "2023-12-21T15:19:04Z", + "availableBytes": 51230248960, + "capacityBytes": 85897244672, + "usedBytes": 7448, + "inodesFree": 0, + "inodes": 0, + "inodesUsed": 0, + "name": "kube-api-access-8gtnx" + } + ], + "ephemeral-storage": { + "time": "2023-12-21T15:19:59Z", + "availableBytes": 51230154752, + "capacityBytes": 85897244672, + "usedBytes": 340662760, + "inodesUsed": 0 + } + } + ] +} \ No newline at end of file diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go index cc956e5b208c..685360ec00ba 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go @@ -9,9 +9,10 @@ package k8swindows // import "github.com/open-telemetry/opentelemetry-collector- import ( "fmt" "os" + "strconv" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" @@ -40,7 +41,7 @@ func new(logger *zap.Logger, info host.Info) (*kubeletSummaryProvider, error) { }, nil } -func (k *kubeletSummaryProvider) getMetrics() ([]*extractors.CAdvisorMetric, error) { +func (k *kubeletSummaryProvider) getMetrics() ([]*cExtractor.CAdvisorMetric, error) { summary, err := k.client.Summary(k.logger) if err != nil { k.logger.Error("kubelet summary API failed, ", zap.Error(err)) @@ -50,29 +51,31 @@ func (k *kubeletSummaryProvider) getMetrics() ([]*extractors.CAdvisorMetric, err return k.getPodMetrics(summary) } -func (k *kubeletSummaryProvider) getContainerMetrics(summary *stats.Summary) ([]*extractors.CAdvisorMetric, error) { - var metrics []*extractors.CAdvisorMetric +func (k *kubeletSummaryProvider) getContainerMetrics(summary *stats.Summary) ([]*cExtractor.CAdvisorMetric, error) { + var metrics []*cExtractor.CAdvisorMetric // todo: implement CPU, memory metrics from containers return metrics, nil } -func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*extractors.CAdvisorMetric, error) { +func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*cExtractor.CAdvisorMetric, error) { // todo: This is not complete implementation of pod level metric collection since network level metrics are pending // May need to add some more pod level labels for store decorators to work properly - var metrics []*extractors.CAdvisorMetric + var metrics []*cExtractor.CAdvisorMetric nodeCPUCores := k.hostInfo.GetNumCores() for _, pod := range summary.Pods { k.logger.Info(fmt.Sprintf("pod summary %v", pod.PodRef.Name)) - metric := extractors.NewCadvisorMetric(ci.TypePod, k.logger) + metric := cExtractor.NewCadvisorMetric(ci.TypePod, k.logger) + tags := map[string]string{} - metric.AddField(ci.PodIDKey, pod.PodRef.UID) - metric.AddField(ci.K8sPodNameKey, pod.PodRef.Name) - metric.AddField(ci.K8sNamespace, pod.PodRef.Namespace) + tags[ci.PodIDKey] = pod.PodRef.UID + tags[ci.K8sPodNameKey] = pod.PodRef.Name + tags[ci.K8sNamespace] = pod.PodRef.Namespace + tags[ci.Timestamp] = strconv.FormatInt(pod.CPU.Time.UnixNano(), 10) // CPU metric - metric.AddField(ci.MetricName(ci.TypePod, ci.CPUTotal), *pod.CPU.UsageCoreNanoSeconds) + metric.AddField(ci.MetricName(ci.TypePod, ci.CPUTotal), float64(*pod.CPU.UsageCoreNanoSeconds)) metric.AddField(ci.MetricName(ci.TypePod, ci.CPUUtilization), float64(*pod.CPU.UsageCoreNanoSeconds)/float64(nodeCPUCores)) // Memory metrics @@ -81,13 +84,15 @@ func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*extra metric.AddField(ci.MetricName(ci.TypePod, ci.MemWorkingset), *pod.Memory.WorkingSetBytes) metric.AddField(ci.MetricName(ci.TypePod, ci.MemReservedCapacity), k.hostInfo.GetMemoryCapacity()) metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilization), float64(*pod.Memory.WorkingSetBytes)/float64(k.hostInfo.GetMemoryCapacity())*100) + + metric.AddTags(tags) metrics = append(metrics, metric) } return metrics, nil } -func (k *kubeletSummaryProvider) getNodeMetrics() ([]*extractors.CAdvisorMetric, error) { - var metrics []*extractors.CAdvisorMetric +func (k *kubeletSummaryProvider) getNodeMetrics() ([]*cExtractor.CAdvisorMetric, error) { + var metrics []*cExtractor.CAdvisorMetric //todo: Implement CPU, memory and network metrics at node return metrics, nil } diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils/helpers.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils/helpers.go new file mode 100644 index 000000000000..66f828c91ba5 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils/helpers.go @@ -0,0 +1,22 @@ +package testutils + +import ( + "encoding/json" + "os" + "testing" + + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + + "github.com/stretchr/testify/assert" +) + +func LoadKubeletSummary(t *testing.T, file string) *stats.Summary { + info, err := os.ReadFile(file) + assert.Nil(t, err, "Fail to read sample kubelet summary response file content") + + var kSummary stats.Summary + err = json.Unmarshal(info, &kSummary) + assert.Nil(t, err, "Fail to parse json string") + + return &kSummary +}