Skip to content

Commit

Permalink
Add storage metrics for container and node level (amazon-contributing…
Browse files Browse the repository at this point in the history
…#151)

* Add storage metrics for container and node level

1. Add storage extractors for container and node level
2. Add metric source for Windows metric collection
3. Refactor metric source for cadvisor
4. Add os label for windows

* Address chad's and pooja's comments

* Refactor: Address review comments

* Refactor: remove extra add source func
  • Loading branch information
KlwntSingh authored Jan 18, 2024
1 parent fa622c9 commit 52bd7cc
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 11 deletions.
1 change: 1 addition & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
MetricType = "Type"
SourcesKey = "Sources"
Timestamp = "Timestamp"
OperatingSystem = "OperatingSystem"

// The following constants are used for metric name construction
CPUTotal = "cpu_usage_total"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,23 @@ type MemoryStat struct {
MajorPageFaults uint64
}

type FileSystemStat struct {
Time time.Time
AvailableBytes uint64
CapacityBytes uint64
UsedBytes uint64
}

// RawMetric Represent Container, Pod, Node Metric Extractors.
// Kubelet summary and HNS stats will be converted to Raw Metric for parsing by Extractors.
type RawMetric struct {
Id string
Name string
Namespace string
Time time.Time
CPUStats CPUStat
MemoryStats MemoryStat
Id string
Name string
Namespace string
Time time.Time
CPUStats CPUStat
MemoryStats MemoryStat
FileSystemStats []FileSystemStat
}

type MetricExtractor interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ func convertCPUStats(kubeletCPUStat stats.CPUStats) CPUStat {
return cpuStat
}

// convertFileSystemStats Convert kubelet file system stats to Raw memory stats
func convertFileSystemStats(kubeletFSstat stats.FsStats) FileSystemStat {
var fsstat FileSystemStat

fsstat.Time = kubeletFSstat.Time.Time

if kubeletFSstat.UsedBytes != nil {
fsstat.UsedBytes = *kubeletFSstat.UsedBytes
}

if kubeletFSstat.AvailableBytes != nil {
fsstat.AvailableBytes = *kubeletFSstat.AvailableBytes
}

if kubeletFSstat.CapacityBytes != nil {
fsstat.CapacityBytes = *kubeletFSstat.CapacityBytes
}

return fsstat
}

// convertMemoryStats Convert kubelet memory stats to Raw memory stats
func convertMemoryStats(kubeletMemoryStat stats.MemoryStats) MemoryStat {
var memoryStat MemoryStat
Expand Down Expand Up @@ -91,6 +112,14 @@ func ConvertContainerToRaw(containerStat stats.ContainerStats, podStat stats.Pod
rawMetic.MemoryStats = convertMemoryStats(*containerStat.Memory)
}

rawMetic.FileSystemStats = []FileSystemStat{}
if containerStat.Rootfs != nil {
rawMetic.FileSystemStats = append(rawMetic.FileSystemStats, convertFileSystemStats(*containerStat.Rootfs))
}
if containerStat.Logs != nil {
rawMetic.FileSystemStats = append(rawMetic.FileSystemStats, convertFileSystemStats(*containerStat.Logs))
}

return rawMetic
}

Expand All @@ -113,5 +142,10 @@ func ConvertNodeToRaw(nodeStat stats.NodeStats) RawMetric {
rawMetic.MemoryStats = convertMemoryStats(*nodeStat.Memory)
}

rawMetic.FileSystemStats = []FileSystemStat{}
if nodeStat.Fs != nil {
rawMetic.FileSystemStats = append(rawMetic.FileSystemStats, convertFileSystemStats(*nodeStat.Fs))
}

return rawMetic
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package extractors // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"

import (
ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"

"go.uber.org/zap"
)

type FileSystemMetricExtractor struct {
logger *zap.Logger
rateCalculator awsmetrics.MetricCalculator
}

func (f *FileSystemMetricExtractor) HasValue(rawMetric RawMetric) bool {
if !rawMetric.Time.IsZero() {
return true
}
return false
}

func (f *FileSystemMetricExtractor) GetValue(rawMetric RawMetric, _ cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric {
if containerType == ci.TypePod {
return nil
}

containerType = getFSMetricType(containerType, f.logger)
metrics := make([]*cExtractor.CAdvisorMetric, 0, len(rawMetric.FileSystemStats))

for _, v := range rawMetric.FileSystemStats {
metric := cExtractor.NewCadvisorMetric(containerType, f.logger)

metric.AddField(ci.MetricName(containerType, ci.FSUsage), v.UsedBytes)
metric.AddField(ci.MetricName(containerType, ci.FSCapacity), v.CapacityBytes)
metric.AddField(ci.MetricName(containerType, ci.FSAvailable), v.AvailableBytes)

if v.CapacityBytes != 0 {
metric.AddField(ci.MetricName(containerType, ci.FSUtilization), float64(v.UsedBytes)/float64(v.CapacityBytes)*100)
}

metrics = append(metrics, metric)
}
return metrics
}

func (f *FileSystemMetricExtractor) Shutdown() error {
return f.rateCalculator.Shutdown()
}

func NewFileSystemMetricExtractor(logger *zap.Logger) *FileSystemMetricExtractor {
return &FileSystemMetricExtractor{
logger: logger,
rateCalculator: cExtractor.NewFloat64RateCalculator(),
}
}

func getFSMetricType(containerType string, logger *zap.Logger) string {
metricType := ""
switch containerType {
case ci.TypeNode:
metricType = ci.TypeNodeFS
case ci.TypeContainer:
metricType = ci.TypeContainerFS
default:
logger.Warn("fs_extractor: fs metric extractor is parsing unexpected containerType", zap.String("containerType", containerType))
}
return metricType
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package extractors

import (
"fmt"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils"

"github.com/stretchr/testify/assert"
)

func TestFSStats(t *testing.T) {
result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json")

nodeRawMetric := ConvertNodeToRaw(result.Node)

// node type
containerType := containerinsight.TypeNode
extractor := NewFileSystemMetricExtractor(nil)

var cMetrics []*cExtractor.CAdvisorMetric
if extractor.HasValue(nodeRawMetric) {
cMetrics = extractor.GetValue(nodeRawMetric, nil, containerType)
}
fmt.Println(len(cMetrics))
expectedFields := map[string]any{
"node_filesystem_usage": uint64(34667089920),
"node_filesystem_capacity": uint64(85897244672),
"node_filesystem_available": uint64(51230154752),
"node_filesystem_utilization": float64(40.358791544917224),
}
expectedTags := map[string]string{
"Type": "NodeFS",
}
cExtractor.AssertContainsTaggedField(t, cMetrics[0], expectedFields, expectedTags)

// pod type
containerType = containerinsight.TypePod
extractor = NewFileSystemMetricExtractor(nil)
podRawMetric := ConvertPodToRaw(result.Pods[0])

if extractor.HasValue(podRawMetric) {
cMetrics = extractor.GetValue(podRawMetric, nil, containerType)
}

assert.Equal(t, len(cMetrics), 0)

// container type for eks
containerType = containerinsight.TypeContainer
extractor = NewFileSystemMetricExtractor(nil)
containerRawMetric := ConvertContainerToRaw(result.Pods[0].Containers[0], result.Pods[0])

if extractor.HasValue(containerRawMetric) {
cMetrics = extractor.GetValue(containerRawMetric, nil, containerType)
}

expectedFields = map[string]any{
"container_filesystem_available": uint64(51230154752),
"container_filesystem_capacity": uint64(85897244672),
"container_filesystem_usage": uint64(339738624),
"container_filesystem_utilization": float64(0.3955174875484043),
}
expectedTags = map[string]string{
"Type": "ContainerFS",
}
cExtractor.AssertContainsTaggedField(t, cMetrics[0], expectedFields, expectedTags)

expectedFields = map[string]any{
"container_filesystem_available": uint64(51230154752),
"container_filesystem_capacity": uint64(85897244672),
"container_filesystem_usage": uint64(919463),
"container_filesystem_utilization": float64(0.0010704219949207732),
}
expectedTags = map[string]string{
"Type": "ContainerFS",
}
cExtractor.AssertContainsTaggedField(t, cMetrics[1], expectedFields, expectedTags)
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info)
metricsExtractors = []extractors.MetricExtractor{}
metricsExtractors = append(metricsExtractors, extractors.NewCPUMetricExtractor(logger))
metricsExtractors = append(metricsExtractors, extractors.NewMemMetricExtractor(logger))
metricsExtractors = append(metricsExtractors, extractors.NewFileSystemMetricExtractor(logger))

ksp, err := kubeletsummaryprovider.New(logger, &hostInfo, metricsExtractors)
if err != nil {
Expand Down Expand Up @@ -108,6 +109,9 @@ func (c *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetri
// add tags for EKS
tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName()

// add tags for OS
tags[ci.OperatingSystem] = "windows"

out := c.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func (sp *SummaryProvider) GetMetrics() ([]*cExtractor.CAdvisorMetric, error) {
}
metrics = append(metrics, outMetrics...)

nodeMetics, err := sp.getNodeMetrics(summary)
nodeMetrics, err := sp.getNodeMetrics(summary)
if err != nil {
sp.logger.Error("failed to get node metrics using kubelet summary, ", zap.Error(err))
return nodeMetics, err
return nodeMetrics, err
}
metrics = append(metrics, nodeMetics...)
metrics = append(metrics, nodeMetrics...)

return metrics, nil
}
Expand Down Expand Up @@ -158,5 +158,6 @@ func (sp *SummaryProvider) getNodeMetrics(summary *stats.Summary) ([]*cExtractor
metrics = append(metrics, extractor.GetValue(rawMetric, sp.hostInfo, ci.TypeNode)...)
}
}

return metrics, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestGetPodMetrics(t *testing.T) {
assert.NotNil(t, podMetric.GetTag(ci.K8sPodNameKey))
assert.NotNil(t, podMetric.GetTag(ci.K8sNamespace))
assert.NotNil(t, podMetric.GetTag(ci.Timestamp))
assert.NotNil(t, podMetric.GetTag(ci.SourcesKey))

containerMetric := metrics[len(metrics)-1]
assert.Equal(t, containerMetric.GetMetricType(), ci.TypeContainer)
Expand All @@ -72,6 +73,7 @@ func TestGetPodMetrics(t *testing.T) {
assert.NotNil(t, containerMetric.GetTag(ci.Timestamp))
assert.NotNil(t, containerMetric.GetTag(ci.ContainerNamekey))
assert.NotNil(t, containerMetric.GetTag(ci.ContainerIDkey))
assert.NotNil(t, containerMetric.GetTag(ci.SourcesKey))
}

// TestGetContainerMetrics verify tags on container level metrics returned.
Expand All @@ -92,6 +94,7 @@ func TestGetContainerMetrics(t *testing.T) {
assert.NotNil(t, containerMetric.GetTag(ci.Timestamp))
assert.NotNil(t, containerMetric.GetTag(ci.ContainerNamekey))
assert.NotNil(t, containerMetric.GetTag(ci.ContainerIDkey))
assert.NotNil(t, containerMetric.GetTag(ci.SourcesKey))
}

// TestGetNodeMetrics verify tags on node level metrics.
Expand All @@ -104,6 +107,7 @@ func TestGetNodeMetrics(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, metrics)

containerMetric := metrics[1]
assert.Equal(t, containerMetric.GetMetricType(), ci.TypeNode)
nodeMetric := metrics[1]
assert.Equal(t, nodeMetric.GetMetricType(), ci.TypeNode)
assert.NotNil(t, nodeMetric.GetTag(ci.SourcesKey))
}
45 changes: 45 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/stores/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func stringInRuneset(name, subset string) bool {
}

func TagMetricSource(metric CIMetric) {
if metric.GetTag(ci.OperatingSystem) == "windows" {
tagMetricSourceWindows(metric)
return
}
tagMetricSourceLinux(metric)
}

func tagMetricSourceLinux(metric CIMetric) {
metricType := metric.GetTag(ci.MetricType)
if metricType == "" {
return
Expand Down Expand Up @@ -132,6 +140,43 @@ func TagMetricSource(metric CIMetric) {
}
}

func tagMetricSourceWindows(metric CIMetric) {
metricType := metric.GetTag(ci.MetricType)
if metricType == "" {
return
}

var sources []string
switch metricType {
case ci.TypeNode:
sources = append(sources, []string{"kubelet", "pod", "calculated"}...)
case ci.TypeNodeFS:
sources = append(sources, []string{"kubelet", "calculated"}...)
case ci.TypeNodeNet:
sources = append(sources, []string{"kubelet", "calculated"}...)
case ci.TypeNodeDiskIO:
sources = append(sources, []string{"kubelet"}...)
case ci.TypePod:
sources = append(sources, []string{"kubelet", "pod", "calculated"}...)
case ci.TypePodNet:
sources = append(sources, []string{"kubelet", "calculated"}...)
case ci.TypeContainer:
sources = append(sources, []string{"kubelet", "pod", "calculated"}...)
case ci.TypeContainerFS:
sources = append(sources, []string{"kubelet", "calculated"}...)
case ci.TypeContainerDiskIO:
sources = append(sources, []string{"kubelet"}...)
}

if len(sources) > 0 {
sourcesInfo, err := json.Marshal(sources)
if err != nil {
return
}
metric.AddTag(ci.SourcesKey, string(sourcesInfo))
}
}

func AddKubernetesInfo(metric CIMetric, kubernetesBlob map[string]any, retainContainerNameTag bool) {
needMoveToKubernetes := map[string]string{ci.K8sPodNameKey: "pod_name", ci.PodIDKey: "pod_id"}
needCopyToKubernetes := map[string]string{ci.K8sNamespace: "namespace_name", ci.TypeService: "service_name", ci.NodeNameKey: "host"}
Expand Down
Loading

0 comments on commit 52bd7cc

Please sign in to comment.