Skip to content

Commit

Permalink
Add memory extractors for pod and node level (amazon-contributing#147)
Browse files Browse the repository at this point in the history
* Add memory extractor at pod and node level

1. Added memory extractor at pod and node level
2. Add unit tests for memory extractor
3. use cpu and memory extractor for k8s windows

* Fix adding tags to collected metrics from extractors
  • Loading branch information
KlwntSingh authored Jan 6, 2024
1 parent 1ad51be commit e44056d
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ func TestCPUStats(t *testing.T) {
extractor := NewCPUMetricExtractor(&zap.Logger{})

var cMetrics []*cExtractor.CAdvisorMetric
cMetrics = extractor.GetValue(podRawMetric, MockCPUMemInfo, containerType)
cMetrics = extractor.GetValue(podRawMetric2, MockCPUMemInfo, containerType)
if extractor.HasValue(podRawMetric) {
cMetrics = extractor.GetValue(podRawMetric, MockCPUMemInfo, containerType)
}
if extractor.HasValue(podRawMetric2) {
cMetrics = extractor.GetValue(podRawMetric2, MockCPUMemInfo, containerType)
}

cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_cpu_usage_total", 3.125000, 0)
cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_cpu_utilization", 0.156250, 0)
Expand All @@ -39,8 +43,12 @@ func TestCPUStats(t *testing.T) {

nodeRawMetric := ConvertNodeToRaw(&result.Node)
nodeRawMetric2 := ConvertNodeToRaw(&result2.Node)
cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType)
cMetrics = extractor.GetValue(nodeRawMetric2, MockCPUMemInfo, containerType)
if extractor.HasValue(nodeRawMetric) {
cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType)
}
if extractor.HasValue(nodeRawMetric2) {
cMetrics = extractor.GetValue(nodeRawMetric2, MockCPUMemInfo, containerType)
}

cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_cpu_usage_total", 51.5, 0.5)
cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_cpu_utilization", 2.5, 0.5)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package extractors // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"

import (
"time"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"

"go.uber.org/zap"
)

type MemMetricExtractor struct {
logger *zap.Logger
rateCalculator awsmetrics.MetricCalculator
}

func (m *MemMetricExtractor) HasValue(rawMetric *RawMetric) bool {
if rawMetric.MemoryStats != nil {
return true
}
return false
}

func (m *MemMetricExtractor) GetValue(rawMetric *RawMetric, mInfo cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric {
var metrics []*cExtractor.CAdvisorMetric

metric := cExtractor.NewCadvisorMetric(containerType, m.logger)
identifier := rawMetric.Id

metric.AddField(ci.MetricName(containerType, ci.MemUsage), *rawMetric.MemoryStats.UsageBytes)
metric.AddField(ci.MetricName(containerType, ci.MemRss), *rawMetric.MemoryStats.RSSBytes)
metric.AddField(ci.MetricName(containerType, ci.MemWorkingset), *rawMetric.MemoryStats.WorkingSetBytes)

multiplier := float64(time.Second)
cExtractor.AssignRateValueToField(&m.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.MemPgfault), identifier,
float64(*rawMetric.MemoryStats.PageFaults), rawMetric.Time, multiplier)
cExtractor.AssignRateValueToField(&m.rateCalculator, metric.GetFields(), ci.MetricName(containerType, ci.MemPgmajfault), identifier,
float64(*rawMetric.MemoryStats.MajorPageFaults), rawMetric.Time, multiplier)

memoryCapacity := mInfo.GetMemoryCapacity()
if metric.GetField(ci.MetricName(containerType, ci.MemWorkingset)) != nil && memoryCapacity != 0 {
metric.AddField(ci.MetricName(containerType, ci.MemUtilization), float64(metric.GetField(ci.MetricName(containerType, ci.MemWorkingset)).(uint64))/float64(memoryCapacity)*100)
}

if containerType == ci.TypeNode {
metric.AddField(ci.MetricName(containerType, ci.MemLimit), memoryCapacity)
}

metrics = append(metrics, metric)
return metrics
}

func (m *MemMetricExtractor) Shutdown() error {
return m.rateCalculator.Shutdown()
}

func NewMemMetricExtractor(logger *zap.Logger) *MemMetricExtractor {
return &MemMetricExtractor{
logger: logger,
rateCalculator: cExtractor.NewFloat64RateCalculator(),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package extractors

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
cTestUtils "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils"

"github.com/stretchr/testify/require"
)

func TestMemStats(t *testing.T) {
MockCPUMemInfo := cTestUtils.MockCPUMemInfo{}
result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")
result2 := testutils.LoadKubeletSummary(t, "./testdata/CurSingleKubeletSummary.json")

podRawMetric := ConvertPodToRaw(&result.Pods[0])
podRawMetric2 := ConvertPodToRaw(&result2.Pods[0])

containerType := containerinsight.TypePod
extractor := NewMemMetricExtractor(nil)

var cMetrics []*cExtractor.CAdvisorMetric
if extractor.HasValue(podRawMetric) {
cMetrics = extractor.GetValue(podRawMetric, MockCPUMemInfo, containerType)
}
if extractor.HasValue(podRawMetric2) {
cMetrics = extractor.GetValue(podRawMetric2, MockCPUMemInfo, containerType)
}

cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "pod_memory_rss", 0)
cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "pod_memory_usage", 0)
cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "pod_memory_working_set", 209088512)

cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_memory_pgfault", 0, 0)
cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "pod_memory_pgmajfault", 0, 0)
require.NoError(t, extractor.Shutdown())

// for node type
containerType = containerinsight.TypeNode
extractor = NewMemMetricExtractor(nil)

nodeRawMetric := ConvertNodeToRaw(&result.Node)
nodeRawMetric2 := ConvertNodeToRaw(&result2.Node)

if extractor.HasValue(nodeRawMetric) {
cMetrics = extractor.GetValue(nodeRawMetric, MockCPUMemInfo, containerType)
}

if extractor.HasValue(nodeRawMetric2) {
cMetrics = extractor.GetValue(nodeRawMetric2, MockCPUMemInfo, containerType)
}

cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "node_memory_rss", 0)
cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "node_memory_usage", 3572293632)
cExtractor.AssertContainsTaggedUint(t, cMetrics[0], "node_memory_working_set", 1026678784)
cExtractor.AssertContainsTaggedInt(t, cMetrics[0], "node_memory_limit", 1073741824)

cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_memory_pgfault", 0, 0)
cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_memory_pgmajfault", 0, 0)
cExtractor.AssertContainsTaggedFloat(t, cMetrics[0], "node_memory_utilization", 95, 0.7)

require.NoError(t, extractor.Shutdown())
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"os"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -29,6 +30,8 @@ type K8sWindows struct {
hostInfo host.Info
}

var metricsExtractors = []extractors.MetricExtractor{}

func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) (*K8sWindows, error) {
nodeName := os.Getenv("HOST_NAME")
if nodeName == "" {
Expand All @@ -39,6 +42,10 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info)
logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err))
return nil, err
}

metricsExtractors = []extractors.MetricExtractor{}
metricsExtractors = append(metricsExtractors, extractors.NewCPUMetricExtractor(logger))
metricsExtractors = append(metricsExtractors, extractors.NewMemMetricExtractor(logger))
return &K8sWindows{
logger: logger,
nodeName: nodeName,
Expand Down Expand Up @@ -66,9 +73,9 @@ func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
return result
}

func (c *K8sWindows) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric {
func (c *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetric) []*cExtractor.CAdvisorMetric {
//ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes()
var result []*extractors.CAdvisorMetric
var result []*cExtractor.CAdvisorMetric
for _, m := range cadvisormetrics {
tags := m.GetTags()
//c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV)
Expand Down Expand Up @@ -109,3 +116,7 @@ func (k *K8sWindows) Shutdown() error {
k.logger.Debug("D! called K8sWindows Shutdown")
return nil
}

func GetMetricsExtractors() []extractors.MetricExtractor {
return metricsExtractors
}
28 changes: 12 additions & 16 deletions receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"

"go.uber.org/zap"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)
Expand All @@ -34,6 +34,7 @@ func new(logger *zap.Logger, info host.Info) (*kubeletSummaryProvider, error) {
if err != nil {
return nil, fmt.Errorf("failed to initialize kubelet client: %w", err)
}

return &kubeletSummaryProvider{
logger: logger,
client: kclient,
Expand Down Expand Up @@ -63,30 +64,25 @@ func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*cExtr

var metrics []*cExtractor.CAdvisorMetric

nodeCPUCores := k.hostInfo.GetNumCores()
for _, pod := range summary.Pods {
k.logger.Info(fmt.Sprintf("pod summary %v", pod.PodRef.Name))
metric := cExtractor.NewCadvisorMetric(ci.TypePod, k.logger)

tags := map[string]string{}

tags[ci.PodIDKey] = pod.PodRef.UID
tags[ci.K8sPodNameKey] = pod.PodRef.Name
tags[ci.K8sNamespace] = pod.PodRef.Namespace
tags[ci.Timestamp] = strconv.FormatInt(pod.CPU.Time.UnixNano(), 10)

// CPU metric
metric.AddField(ci.MetricName(ci.TypePod, ci.CPUTotal), float64(*pod.CPU.UsageCoreNanoSeconds))
metric.AddField(ci.MetricName(ci.TypePod, ci.CPUUtilization), float64(*pod.CPU.UsageCoreNanoSeconds)/float64(nodeCPUCores))

// Memory metrics
metric.AddField(ci.MetricName(ci.TypePod, ci.MemUsage), *pod.Memory.UsageBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemRss), *pod.Memory.RSSBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemWorkingset), *pod.Memory.WorkingSetBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemReservedCapacity), k.hostInfo.GetMemoryCapacity())
metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilization), float64(*pod.Memory.WorkingSetBytes)/float64(k.hostInfo.GetMemoryCapacity())*100)

metric.AddTags(tags)
metrics = append(metrics, metric)
rawMetric := extractors.ConvertPodToRaw(&pod)
for _, extractor := range GetMetricsExtractors() {
if extractor.HasValue(rawMetric) {
metrics = append(metrics, extractor.GetValue(rawMetric, &k.hostInfo, ci.TypePod)...)
}
}
for _, metric := range metrics {
metric.AddTags(tags)
}
}
return metrics, nil
}
Expand Down

0 comments on commit e44056d

Please sign in to comment.