Skip to content

Commit

Permalink
Define structs for CPU and Memory stats (amazon-contributing#149)
Browse files Browse the repository at this point in the history
* Define structs for CPU and Memory and stats

1. Create new structs to represent CPU and memory stats in RawMetric. This
removes RawMetric dependency on Kubelet CPU and memory stats.
2. Refactor existing RawMetric struct to use new CPU and memory stats.
3. Add more unit tests for extractorhelpers

* Refactor: Remove parameters passing by reference in extractors

* Remove extra comments in error
  • Loading branch information
KlwntSingh authored Jan 10, 2024
1 parent e44056d commit dfc7c15
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ type CPUMetricExtractor struct {
rateCalculator awsmetrics.MetricCalculator
}

func (c *CPUMetricExtractor) HasValue(rawMetric *RawMetric) bool {
if rawMetric.CPUStats != nil {
func (c *CPUMetricExtractor) HasValue(rawMetric RawMetric) bool {
if !rawMetric.Time.IsZero() {
return true
}
return false
}

func (c *CPUMetricExtractor) GetValue(rawMetric *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric {
func (c *CPUMetricExtractor) GetValue(rawMetric RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric {
var metrics []*cExtractor.CAdvisorMetric

metric := cExtractor.NewCadvisorMetric(containerType, c.logger)

multiplier := float64(decimalToMillicores)
identifier := rawMetric.Id
cExtractor.AssignRateValueToField(&c.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.CPUTotal), identifier, float64(*rawMetric.CPUStats.UsageCoreNanoSeconds), rawMetric.Time, multiplier)
cExtractor.AssignRateValueToField(&c.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.CPUTotal), identifier, float64(rawMetric.CPUStats.UsageCoreNanoSeconds), rawMetric.Time, multiplier)

numCores := mInfo.GetNumCores()
if metric.GetField(ci.MetricName(containerType, ci.CPUTotal)) != nil && numCores != 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func TestCPUStats(t *testing.T) {
result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")
result2 := testutils.LoadKubeletSummary(t, "./testdata/CurSingleKubeletSummary.json")

podRawMetric := ConvertPodToRaw(&result.Pods[0])
podRawMetric2 := ConvertPodToRaw(&result2.Pods[0])
podRawMetric := ConvertPodToRaw(result.Pods[0])
podRawMetric2 := ConvertPodToRaw(result2.Pods[0])

// test container type
containerType := containerinsight.TypePod
Expand All @@ -41,8 +41,8 @@ func TestCPUStats(t *testing.T) {
containerType = containerinsight.TypeNode
extractor = NewCPUMetricExtractor(nil)

nodeRawMetric := ConvertNodeToRaw(&result.Node)
nodeRawMetric2 := ConvertNodeToRaw(&result2.Node)
nodeRawMetric := ConvertNodeToRaw(result.Node)
nodeRawMetric2 := ConvertNodeToRaw(result2.Node)
if extractor.HasValue(nodeRawMetric) {
cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,35 @@ import (
"time"

cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)

// CPUStat for Pod, Container and Node.
type CPUStat struct {
Time time.Time
UsageNanoCores uint64
UsageCoreNanoSeconds uint64
}

// MemoryStat for Pod, Container and Node
type MemoryStat struct {
Time time.Time
AvailableBytes uint64
UsageBytes uint64
WorkingSetBytes uint64
RSSBytes uint64
PageFaults uint64
MajorPageFaults uint64
}

// RawMetric Represent Container, Pod, Node Metric Extractors.
// Kubelet summary and HNS stats will be converted to Raw Metric for parsing by Extractors.
type RawMetric struct {
Id string
Name string
Namespace string
Time time.Time
CPUStats *stats.CPUStats
MemoryStats *stats.MemoryStats
CPUStats CPUStat
MemoryStats MemoryStat
}

type MetricExtractor interface {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,116 @@
package extractors

import (
"fmt"

stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)

// convertCPUStats Convert kubelet CPU stats to Raw CPU stats
func convertCPUStats(kubeletCPUStat stats.CPUStats) CPUStat {
var cpuStat CPUStat

cpuStat.Time = kubeletCPUStat.Time.Time

if kubeletCPUStat.UsageCoreNanoSeconds != nil {
cpuStat.UsageCoreNanoSeconds = *kubeletCPUStat.UsageCoreNanoSeconds
}
if kubeletCPUStat.UsageNanoCores != nil {
cpuStat.UsageNanoCores = *kubeletCPUStat.UsageNanoCores
}
return cpuStat
}

// convertMemoryStats Convert kubelet memory stats to Raw memory stats
func convertMemoryStats(kubeletMemoryStat stats.MemoryStats) MemoryStat {
var memoryStat MemoryStat

memoryStat.Time = kubeletMemoryStat.Time.Time

if kubeletMemoryStat.UsageBytes != nil {
memoryStat.UsageBytes = *kubeletMemoryStat.UsageBytes
}
if kubeletMemoryStat.AvailableBytes != nil {
memoryStat.AvailableBytes = *kubeletMemoryStat.AvailableBytes
}
if kubeletMemoryStat.WorkingSetBytes != nil {
memoryStat.WorkingSetBytes = *kubeletMemoryStat.WorkingSetBytes
}
if kubeletMemoryStat.RSSBytes != nil {
memoryStat.RSSBytes = *kubeletMemoryStat.RSSBytes
}
if kubeletMemoryStat.PageFaults != nil {
memoryStat.PageFaults = *kubeletMemoryStat.PageFaults
}
if kubeletMemoryStat.MajorPageFaults != nil {
memoryStat.MajorPageFaults = *kubeletMemoryStat.MajorPageFaults
}
return memoryStat
}

// ConvertPodToRaw Converts Kubelet Pod stats to RawMetric.
func ConvertPodToRaw(podStat *stats.PodStats) *RawMetric {
var rawMetic *RawMetric
rawMetic = &RawMetric{}
func ConvertPodToRaw(podStat stats.PodStats) RawMetric {
var rawMetic RawMetric

rawMetic.Id = podStat.PodRef.UID
rawMetic.Name = podStat.PodRef.Name
rawMetic.Namespace = podStat.PodRef.Namespace

if podStat.CPU != nil {
rawMetic.Time = podStat.CPU.Time.Time
rawMetic.CPUStats = podStat.CPU
rawMetic.CPUStats = convertCPUStats(*podStat.CPU)
}

if podStat.Memory != nil {
rawMetic.MemoryStats = podStat.Memory
if rawMetic.Time.IsZero() {
rawMetic.Time = podStat.Memory.Time.Time
}
rawMetic.MemoryStats = convertMemoryStats(*podStat.Memory)
}

return rawMetic
}

// ConvertContainerToRaw Converts Kubelet Container stats per Pod to RawMetric.
func ConvertContainerToRaw(containerStat stats.ContainerStats, podStat stats.PodStats) RawMetric {
var rawMetic RawMetric

rawMetic.Id = fmt.Sprintf("%s-%s", podStat.PodRef.UID, containerStat.Name)
rawMetic.Name = containerStat.Name
rawMetic.Namespace = podStat.PodRef.Namespace

if containerStat.CPU != nil {
rawMetic.Time = containerStat.CPU.Time.Time
rawMetic.CPUStats = convertCPUStats(*containerStat.CPU)
}

if containerStat.Memory != nil {
if rawMetic.Time.IsZero() {
rawMetic.Time = containerStat.Memory.Time.Time
}
rawMetic.MemoryStats = convertMemoryStats(*containerStat.Memory)
}

return rawMetic
}

// ConvertNodeToRaw Converts Kubelet Node stats to RawMetric.
func ConvertNodeToRaw(nodeStat *stats.NodeStats) *RawMetric {
var rawMetic *RawMetric
rawMetic = &RawMetric{}
func ConvertNodeToRaw(nodeStat stats.NodeStats) RawMetric {
var rawMetic RawMetric

rawMetic.Id = nodeStat.NodeName
rawMetic.Name = nodeStat.NodeName

if nodeStat.CPU != nil {
rawMetic.Time = nodeStat.CPU.Time.Time
rawMetic.CPUStats = nodeStat.CPU
rawMetic.CPUStats = convertCPUStats(*nodeStat.CPU)
}

if nodeStat.Memory != nil {
rawMetic.MemoryStats = nodeStat.Memory
if rawMetic.Time.IsZero() {
rawMetic.Time = nodeStat.Memory.Time.Time
}
rawMetic.MemoryStats = convertMemoryStats(*nodeStat.Memory)
}

return rawMetic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,63 @@ func TestConvertPodToRaw(t *testing.T) {

result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")

podRawMetric := ConvertPodToRaw(&result.Pods[0])
podRawMetric := ConvertPodToRaw(result.Pods[0])

assert.Equal(t, podRawMetric.Id, "01bfbe59-2925-4ad5-a8d3-a1b23e3ddd74")
assert.Equal(t, podRawMetric.Name, "windows-server-iis-ltsc2019-58d94b5844-6v2pg")
assert.Equal(t, podRawMetric.Namespace, "amazon-cloudwatch")
parsedtime, _ := time.Parse(time.RFC3339, "2023-12-21T15:19:59Z")
assert.Equal(t, podRawMetric.Time, parsedtime.Local())
assert.Equal(t, *podRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(289625000000))
assert.Equal(t, *podRawMetric.CPUStats.UsageNanoCores, uint64(0))
assert.Equal(t, podRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(289625000000))
assert.Equal(t, podRawMetric.CPUStats.UsageNanoCores, uint64(0))

assert.Equal(t, podRawMetric.MemoryStats.UsageBytes, uint64(0))
assert.Equal(t, podRawMetric.MemoryStats.AvailableBytes, uint64(0))
assert.Equal(t, podRawMetric.MemoryStats.WorkingSetBytes, uint64(208949248))
assert.Equal(t, podRawMetric.MemoryStats.RSSBytes, uint64(0))
assert.Equal(t, podRawMetric.MemoryStats.PageFaults, uint64(0))
assert.Equal(t, podRawMetric.MemoryStats.MajorPageFaults, uint64(0))
}

func TestConvertContainerToRaw(t *testing.T) {
result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")

containerRawMetric := ConvertContainerToRaw(result.Pods[0].Containers[0], result.Pods[0])

assert.Equal(t, containerRawMetric.Id, "01bfbe59-2925-4ad5-a8d3-a1b23e3ddd74-windows-server-iis-ltsc2019")
assert.Equal(t, containerRawMetric.Name, "windows-server-iis-ltsc2019")
assert.Equal(t, containerRawMetric.Namespace, "amazon-cloudwatch")
parsedtime, _ := time.Parse(time.RFC3339, "2023-12-21T15:19:59Z")
assert.Equal(t, containerRawMetric.Time, parsedtime.Local())
assert.Equal(t, containerRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(289625000000))
assert.Equal(t, containerRawMetric.CPUStats.UsageNanoCores, uint64(0))

assert.Equal(t, containerRawMetric.MemoryStats.UsageBytes, uint64(0))
assert.Equal(t, containerRawMetric.MemoryStats.AvailableBytes, uint64(0))
assert.Equal(t, containerRawMetric.MemoryStats.WorkingSetBytes, uint64(208949248))
assert.Equal(t, containerRawMetric.MemoryStats.RSSBytes, uint64(0))
assert.Equal(t, containerRawMetric.MemoryStats.PageFaults, uint64(0))
assert.Equal(t, containerRawMetric.MemoryStats.MajorPageFaults, uint64(0))
}

func TestConvertNodeToRaw(t *testing.T) {

result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")

nodeRawMetric := ConvertNodeToRaw(&result.Node)
nodeRawMetric := ConvertNodeToRaw(result.Node)

assert.Equal(t, nodeRawMetric.Id, "ip-192-168-44-84.us-west-2.compute.internal")
assert.Equal(t, nodeRawMetric.Name, "ip-192-168-44-84.us-west-2.compute.internal")
assert.Equal(t, nodeRawMetric.Namespace, "")
parsedtime, _ := time.Parse(time.RFC3339, "2023-12-21T15:19:58Z")
assert.Equal(t, nodeRawMetric.Time, parsedtime.Local())
assert.Equal(t, *nodeRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(38907680000000))
assert.Equal(t, *nodeRawMetric.CPUStats.UsageNanoCores, uint64(20000000))
assert.Equal(t, nodeRawMetric.CPUStats.UsageCoreNanoSeconds, uint64(38907680000000))
assert.Equal(t, nodeRawMetric.CPUStats.UsageNanoCores, uint64(20000000))

assert.Equal(t, nodeRawMetric.MemoryStats.UsageBytes, uint64(3583389696))
assert.Equal(t, nodeRawMetric.MemoryStats.AvailableBytes, uint64(7234662400))
assert.Equal(t, nodeRawMetric.MemoryStats.WorkingSetBytes, uint64(1040203776))
assert.Equal(t, nodeRawMetric.MemoryStats.RSSBytes, uint64(0))
assert.Equal(t, nodeRawMetric.MemoryStats.PageFaults, uint64(0))
assert.Equal(t, nodeRawMetric.MemoryStats.MajorPageFaults, uint64(0))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,27 @@ type MemMetricExtractor struct {
rateCalculator awsmetrics.MetricCalculator
}

func (m *MemMetricExtractor) HasValue(rawMetric *RawMetric) bool {
if rawMetric.MemoryStats != nil {
func (m *MemMetricExtractor) HasValue(rawMetric RawMetric) bool {
if !rawMetric.Time.IsZero() {
return true
}
return false
}

func (m *MemMetricExtractor) GetValue(rawMetric *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric {
func (m *MemMetricExtractor) GetValue(rawMetric RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric {
var metrics []*cExtractor.CAdvisorMetric

metric := cExtractor.NewCadvisorMetric(containerType, m.logger)
identifier := rawMetric.Id

metric.AddField(ci.MetricName(containerType, ci.MemUsage), *rawMetric.MemoryStats.UsageBytes)
metric.AddField(ci.MetricName(containerType, ci.MemRss), *rawMetric.MemoryStats.RSSBytes)
metric.AddField(ci.MetricName(containerType, ci.MemWorkingset), *rawMetric.MemoryStats.WorkingSetBytes)
metric.AddField(ci.MetricName(containerType, ci.MemUsage), rawMetric.MemoryStats.UsageBytes)
metric.AddField(ci.MetricName(containerType, ci.MemRss), rawMetric.MemoryStats.RSSBytes)
metric.AddField(ci.MetricName(containerType, ci.MemWorkingset), rawMetric.MemoryStats.WorkingSetBytes)

multiplier := float64(time.Second)
cExtractor.AssignRateValueToField(&m.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.MemPgfault), identifier,
float64(*rawMetric.MemoryStats.PageFaults), rawMetric.Time, multiplier)
float64(rawMetric.MemoryStats.PageFaults), rawMetric.Time, multiplier)
cExtractor.AssignRateValueToField(&m.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.MemPgmajfault), identifier,
float64(*rawMetric.MemoryStats.MajorPageFaults), rawMetric.Time, multiplier)
float64(rawMetric.MemoryStats.MajorPageFaults), rawMetric.Time, multiplier)

memoryCapacity := mInfo.GetMemoryCapacity()
if metric.GetField(ci.MetricName(containerType, ci.MemWorkingset)) != nil && memoryCapacity != 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func TestMemStats(t *testing.T) {
result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")
result2 := testutils.LoadKubeletSummary(t, "./testdata/CurSingleKubeletSummary.json")

podRawMetric := ConvertPodToRaw(&result.Pods[0])
podRawMetric2 := ConvertPodToRaw(&result2.Pods[0])
podRawMetric := ConvertPodToRaw(result.Pods[0])
podRawMetric2 := ConvertPodToRaw(result2.Pods[0])

containerType := containerinsight.TypePod
extractor := NewMemMetricExtractor(nil)
Expand All @@ -45,8 +45,8 @@ func TestMemStats(t *testing.T) {
containerType = containerinsight.TypeNode
extractor = NewMemMetricExtractor(nil)

nodeRawMetric := ConvertNodeToRaw(&result.Node)
nodeRawMetric2 := ConvertNodeToRaw(&result2.Node)
nodeRawMetric := ConvertNodeToRaw(result.Node)
nodeRawMetric2 := ConvertNodeToRaw(result2.Node)

if extractor.HasValue(nodeRawMetric) {
cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*cExtr
tags[ci.PodIDKey] = pod.PodRef.UID
tags[ci.K8sPodNameKey] = pod.PodRef.Name
tags[ci.K8sNamespace] = pod.PodRef.Namespace
tags[ci.Timestamp] = strconv.FormatInt(pod.CPU.Time.UnixNano(), 10)

rawMetric := extractors.ConvertPodToRaw(&pod)
rawMetric := extractors.ConvertPodToRaw(pod)
tags[ci.Timestamp] = strconv.FormatInt(rawMetric.Time.UnixNano(), 10)
for _, extractor := range GetMetricsExtractors() {
if extractor.HasValue(rawMetric) {
metrics = append(metrics, extractor.GetValue(rawMetric, &k.hostInfo, ci.TypePod)...)
Expand Down

0 comments on commit dfc7c15

Please sign in to comment.