From 52bd7cc20c1a13851002a232bbb6ad23db9f3c9b Mon Sep 17 00:00:00 2001 From: Kulwant Singh Date: Thu, 18 Jan 2024 17:12:56 +0000 Subject: [PATCH] Add storage metrics for container and node level (#151) * Add storage metrics for container and node level 1. Add storage extractors for container and node level 2. Add metric source for Windows metric collection 3. Refactor metric source for cadvisor 4. Add os label for windows * Address chad's and pooja's comments * Refactor: Address review comments * Refactor: remove extra add source func --- internal/aws/containerinsight/const.go | 1 + .../k8swindows/extractors/extractor.go | 20 +++-- .../k8swindows/extractors/extractorhelpers.go | 34 ++++++++ .../k8swindows/extractors/fs_extractor.go | 72 ++++++++++++++++ .../extractors/fs_extractor_test.go | 83 +++++++++++++++++++ .../internal/k8swindows/k8swindows.go | 4 + .../internal/k8swindows/kubelet/kubelet.go | 7 +- .../k8swindows/kubelet/kubelet_test.go | 8 +- .../internal/stores/utils.go | 45 ++++++++++ .../internal/stores/utils_test.go | 40 +++++++++ 10 files changed, 303 insertions(+), 11 deletions(-) create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/fs_extractor.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/fs_extractor_test.go diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index 38a94360bd8b..0e46f9a31455 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -26,6 +26,7 @@ const ( MetricType = "Type" SourcesKey = "Sources" Timestamp = "Timestamp" + OperatingSystem = "OperatingSystem" // The following constants are used for metric name construction CPUTotal = "cpu_usage_total" diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go index 55c4f2bac3aa..13e412f9e946 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractor.go @@ -24,15 +24,23 @@ type MemoryStat struct { MajorPageFaults uint64 } +type FileSystemStat struct { + Time time.Time + AvailableBytes uint64 + CapacityBytes uint64 + UsedBytes uint64 +} + // RawMetric Represent Container, Pod, Node Metric Extractors. // Kubelet summary and HNS stats will be converted to Raw Metric for parsing by Extractors. type RawMetric struct { - Id string - Name string - Namespace string - Time time.Time - CPUStats CPUStat - MemoryStats MemoryStat + Id string + Name string + Namespace string + Time time.Time + CPUStats CPUStat + MemoryStats MemoryStat + FileSystemStats []FileSystemStat } type MetricExtractor interface { diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go index 5f18b8101740..d6beaaa932b9 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/extractorhelpers.go @@ -21,6 +21,27 @@ func convertCPUStats(kubeletCPUStat stats.CPUStats) CPUStat { return cpuStat } +// convertFileSystemStats Convert kubelet file system stats to Raw memory stats +func convertFileSystemStats(kubeletFSstat stats.FsStats) FileSystemStat { + var fsstat FileSystemStat + + fsstat.Time = kubeletFSstat.Time.Time + + if kubeletFSstat.UsedBytes != nil { + fsstat.UsedBytes = *kubeletFSstat.UsedBytes + } + + if kubeletFSstat.AvailableBytes != nil { + fsstat.AvailableBytes = *kubeletFSstat.AvailableBytes + } + + if kubeletFSstat.CapacityBytes != nil { + fsstat.CapacityBytes = *kubeletFSstat.CapacityBytes + } + + return fsstat +} + // convertMemoryStats Convert kubelet memory stats to Raw memory stats func convertMemoryStats(kubeletMemoryStat stats.MemoryStats) MemoryStat { var memoryStat MemoryStat @@ -91,6 +112,14 @@ func ConvertContainerToRaw(containerStat stats.ContainerStats, podStat stats.Pod rawMetic.MemoryStats = convertMemoryStats(*containerStat.Memory) } + rawMetic.FileSystemStats = []FileSystemStat{} + if containerStat.Rootfs != nil { + rawMetic.FileSystemStats = append(rawMetic.FileSystemStats, convertFileSystemStats(*containerStat.Rootfs)) + } + if containerStat.Logs != nil { + rawMetic.FileSystemStats = append(rawMetic.FileSystemStats, convertFileSystemStats(*containerStat.Logs)) + } + return rawMetic } @@ -113,5 +142,10 @@ func ConvertNodeToRaw(nodeStat stats.NodeStats) RawMetric { rawMetic.MemoryStats = convertMemoryStats(*nodeStat.Memory) } + rawMetic.FileSystemStats = []FileSystemStat{} + if nodeStat.Fs != nil { + rawMetic.FileSystemStats = append(rawMetic.FileSystemStats, convertFileSystemStats(*nodeStat.Fs)) + } + return rawMetic } diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/fs_extractor.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/fs_extractor.go new file mode 100644 index 000000000000..2723af9f480e --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/fs_extractor.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extractors // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + +import ( + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + + "go.uber.org/zap" +) + +type FileSystemMetricExtractor struct { + logger *zap.Logger + rateCalculator awsmetrics.MetricCalculator +} + +func (f *FileSystemMetricExtractor) HasValue(rawMetric RawMetric) bool { + if !rawMetric.Time.IsZero() { + return true + } + return false +} + +func (f *FileSystemMetricExtractor) GetValue(rawMetric RawMetric, _ cExtractor.CPUMemInfoProvider, containerType string) []*cExtractor.CAdvisorMetric { + if containerType == ci.TypePod { + return nil + } + + containerType = getFSMetricType(containerType, f.logger) + metrics := make([]*cExtractor.CAdvisorMetric, 0, len(rawMetric.FileSystemStats)) + + for _, v := range rawMetric.FileSystemStats { + metric := cExtractor.NewCadvisorMetric(containerType, f.logger) + + metric.AddField(ci.MetricName(containerType, ci.FSUsage), v.UsedBytes) + metric.AddField(ci.MetricName(containerType, ci.FSCapacity), v.CapacityBytes) + metric.AddField(ci.MetricName(containerType, ci.FSAvailable), v.AvailableBytes) + + if v.CapacityBytes != 0 { + metric.AddField(ci.MetricName(containerType, ci.FSUtilization), float64(v.UsedBytes)/float64(v.CapacityBytes)*100) + } + + metrics = append(metrics, metric) + } + return metrics +} + +func (f *FileSystemMetricExtractor) Shutdown() error { + return f.rateCalculator.Shutdown() +} + +func NewFileSystemMetricExtractor(logger *zap.Logger) *FileSystemMetricExtractor { + return &FileSystemMetricExtractor{ + logger: logger, + rateCalculator: cExtractor.NewFloat64RateCalculator(), + } +} + +func getFSMetricType(containerType string, logger *zap.Logger) string { + metricType := "" + switch containerType { + case ci.TypeNode: + metricType = ci.TypeNodeFS + case ci.TypeContainer: + metricType = ci.TypeContainerFS + default: + logger.Warn("fs_extractor: fs metric extractor is parsing unexpected containerType", zap.String("containerType", containerType)) + } + return metricType +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/fs_extractor_test.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/fs_extractor_test.go new file mode 100644 index 000000000000..60088a7b63e6 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/extractors/fs_extractor_test.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extractors + +import ( + "fmt" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + cExtractor "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows/testutils" + + "github.com/stretchr/testify/assert" +) + +func TestFSStats(t *testing.T) { + result := testutils.LoadKubeletSummary(t, "./testdata/PreSingleKubeletSummary.json") + + nodeRawMetric := ConvertNodeToRaw(result.Node) + + // node type + containerType := containerinsight.TypeNode + extractor := NewFileSystemMetricExtractor(nil) + + var cMetrics []*cExtractor.CAdvisorMetric + if extractor.HasValue(nodeRawMetric) { + cMetrics = extractor.GetValue(nodeRawMetric, nil, containerType) + } + fmt.Println(len(cMetrics)) + expectedFields := map[string]any{ + "node_filesystem_usage": uint64(34667089920), + "node_filesystem_capacity": uint64(85897244672), + "node_filesystem_available": uint64(51230154752), + "node_filesystem_utilization": float64(40.358791544917224), + } + expectedTags := map[string]string{ + "Type": "NodeFS", + } + cExtractor.AssertContainsTaggedField(t, cMetrics[0], expectedFields, expectedTags) + + // pod type + containerType = containerinsight.TypePod + extractor = NewFileSystemMetricExtractor(nil) + podRawMetric := ConvertPodToRaw(result.Pods[0]) + + if extractor.HasValue(podRawMetric) { + cMetrics = extractor.GetValue(podRawMetric, nil, containerType) + } + + assert.Equal(t, len(cMetrics), 0) + + // container type for eks + containerType = containerinsight.TypeContainer + extractor = NewFileSystemMetricExtractor(nil) + containerRawMetric := ConvertContainerToRaw(result.Pods[0].Containers[0], result.Pods[0]) + + if extractor.HasValue(containerRawMetric) { + cMetrics = extractor.GetValue(containerRawMetric, nil, containerType) + } + + expectedFields = map[string]any{ + "container_filesystem_available": uint64(51230154752), + "container_filesystem_capacity": uint64(85897244672), + "container_filesystem_usage": uint64(339738624), + "container_filesystem_utilization": float64(0.3955174875484043), + } + expectedTags = map[string]string{ + "Type": "ContainerFS", + } + cExtractor.AssertContainsTaggedField(t, cMetrics[0], expectedFields, expectedTags) + + expectedFields = map[string]any{ + "container_filesystem_available": uint64(51230154752), + "container_filesystem_capacity": uint64(85897244672), + "container_filesystem_usage": uint64(919463), + "container_filesystem_utilization": float64(0.0010704219949207732), + } + expectedTags = map[string]string{ + "Type": "ContainerFS", + } + cExtractor.AssertContainsTaggedField(t, cMetrics[1], expectedFields, expectedTags) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go index d298546a4555..3f7fd0ce7cf1 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go @@ -42,6 +42,7 @@ func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) metricsExtractors = []extractors.MetricExtractor{} metricsExtractors = append(metricsExtractors, extractors.NewCPUMetricExtractor(logger)) metricsExtractors = append(metricsExtractors, extractors.NewMemMetricExtractor(logger)) + metricsExtractors = append(metricsExtractors, extractors.NewFileSystemMetricExtractor(logger)) ksp, err := kubeletsummaryprovider.New(logger, &hostInfo, metricsExtractors) if err != nil { @@ -108,6 +109,9 @@ func (c *K8sWindows) decorateMetrics(cadvisormetrics []*cExtractor.CAdvisorMetri // add tags for EKS tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName() + // add tags for OS + tags[ci.OperatingSystem] = "windows" + out := c.k8sDecorator.Decorate(m) if out != nil { result = append(result, out) diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet.go index e11449e95270..ce7eaac08bc0 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet.go @@ -63,12 +63,12 @@ func (sp *SummaryProvider) GetMetrics() ([]*cExtractor.CAdvisorMetric, error) { } metrics = append(metrics, outMetrics...) - nodeMetics, err := sp.getNodeMetrics(summary) + nodeMetrics, err := sp.getNodeMetrics(summary) if err != nil { sp.logger.Error("failed to get node metrics using kubelet summary, ", zap.Error(err)) - return nodeMetics, err + return nodeMetrics, err } - metrics = append(metrics, nodeMetics...) + metrics = append(metrics, nodeMetrics...) return metrics, nil } @@ -158,5 +158,6 @@ func (sp *SummaryProvider) getNodeMetrics(summary *stats.Summary) ([]*cExtractor metrics = append(metrics, extractor.GetValue(rawMetric, sp.hostInfo, ci.TypeNode)...) } } + return metrics, nil } diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet_test.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet_test.go index f5415573b115..c2e2791c2802 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/kubelet_test.go @@ -63,6 +63,7 @@ func TestGetPodMetrics(t *testing.T) { assert.NotNil(t, podMetric.GetTag(ci.K8sPodNameKey)) assert.NotNil(t, podMetric.GetTag(ci.K8sNamespace)) assert.NotNil(t, podMetric.GetTag(ci.Timestamp)) + assert.NotNil(t, podMetric.GetTag(ci.SourcesKey)) containerMetric := metrics[len(metrics)-1] assert.Equal(t, containerMetric.GetMetricType(), ci.TypeContainer) @@ -72,6 +73,7 @@ func TestGetPodMetrics(t *testing.T) { assert.NotNil(t, containerMetric.GetTag(ci.Timestamp)) assert.NotNil(t, containerMetric.GetTag(ci.ContainerNamekey)) assert.NotNil(t, containerMetric.GetTag(ci.ContainerIDkey)) + assert.NotNil(t, containerMetric.GetTag(ci.SourcesKey)) } // TestGetContainerMetrics verify tags on container level metrics returned. @@ -92,6 +94,7 @@ func TestGetContainerMetrics(t *testing.T) { assert.NotNil(t, containerMetric.GetTag(ci.Timestamp)) assert.NotNil(t, containerMetric.GetTag(ci.ContainerNamekey)) assert.NotNil(t, containerMetric.GetTag(ci.ContainerIDkey)) + assert.NotNil(t, containerMetric.GetTag(ci.SourcesKey)) } // TestGetNodeMetrics verify tags on node level metrics. @@ -104,6 +107,7 @@ func TestGetNodeMetrics(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, metrics) - containerMetric := metrics[1] - assert.Equal(t, containerMetric.GetMetricType(), ci.TypeNode) + nodeMetric := metrics[1] + assert.Equal(t, nodeMetric.GetMetricType(), ci.TypeNode) + assert.NotNil(t, nodeMetric.GetTag(ci.SourcesKey)) } diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils.go b/receiver/awscontainerinsightreceiver/internal/stores/utils.go index c1db9f3f9884..a42d62de7b4d 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils.go @@ -96,6 +96,14 @@ func stringInRuneset(name, subset string) bool { } func TagMetricSource(metric CIMetric) { + if metric.GetTag(ci.OperatingSystem) == "windows" { + tagMetricSourceWindows(metric) + return + } + tagMetricSourceLinux(metric) +} + +func tagMetricSourceLinux(metric CIMetric) { metricType := metric.GetTag(ci.MetricType) if metricType == "" { return @@ -132,6 +140,43 @@ func TagMetricSource(metric CIMetric) { } } +func tagMetricSourceWindows(metric CIMetric) { + metricType := metric.GetTag(ci.MetricType) + if metricType == "" { + return + } + + var sources []string + switch metricType { + case ci.TypeNode: + sources = append(sources, []string{"kubelet", "pod", "calculated"}...) + case ci.TypeNodeFS: + sources = append(sources, []string{"kubelet", "calculated"}...) + case ci.TypeNodeNet: + sources = append(sources, []string{"kubelet", "calculated"}...) + case ci.TypeNodeDiskIO: + sources = append(sources, []string{"kubelet"}...) + case ci.TypePod: + sources = append(sources, []string{"kubelet", "pod", "calculated"}...) + case ci.TypePodNet: + sources = append(sources, []string{"kubelet", "calculated"}...) + case ci.TypeContainer: + sources = append(sources, []string{"kubelet", "pod", "calculated"}...) + case ci.TypeContainerFS: + sources = append(sources, []string{"kubelet", "calculated"}...) + case ci.TypeContainerDiskIO: + sources = append(sources, []string{"kubelet"}...) + } + + if len(sources) > 0 { + sourcesInfo, err := json.Marshal(sources) + if err != nil { + return + } + metric.AddTag(ci.SourcesKey, string(sourcesInfo)) + } +} + func AddKubernetesInfo(metric CIMetric, kubernetesBlob map[string]any, retainContainerNameTag bool) { needMoveToKubernetes := map[string]string{ci.K8sPodNameKey: "pod_name", ci.PodIDKey: "pod_id"} needCopyToKubernetes := map[string]string{ci.K8sNamespace: "namespace_name", ci.TypeService: "service_name", ci.NodeNameKey: "host"} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go index 397d325cc773..7aaffc2031fb 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go @@ -201,3 +201,43 @@ func TestUtils_TagMetricSource(t *testing.T) { assert.Equal(t, expectedSources[i], metric.tags[ci.SourcesKey]) } } + +func TestUtils_TagMetricSourceWindows(t *testing.T) { + types := []string{ + "", + ci.TypeNode, + ci.TypeNodeFS, + ci.TypeNodeNet, + ci.TypeNodeDiskIO, + ci.TypePod, + ci.TypePodNet, + ci.TypeContainer, + ci.TypeContainerFS, + ci.TypeContainerDiskIO, + } + + expectedSources := []string{ + "", + "[\"kubelet\",\"pod\",\"calculated\"]", + "[\"kubelet\",\"calculated\"]", + "[\"kubelet\",\"calculated\"]", + "[\"kubelet\"]", + "[\"kubelet\",\"pod\",\"calculated\"]", + "[\"kubelet\",\"calculated\"]", + "[\"kubelet\",\"pod\",\"calculated\"]", + "[\"kubelet\",\"calculated\"]", + "[\"kubelet\"]", + } + for i, mtype := range types { + tags := map[string]string{ + ci.MetricType: mtype, + ci.OperatingSystem: "windows", + } + + metric := &mockCIMetric{ + tags: tags, + } + TagMetricSource(metric) + assert.Equal(t, expectedSources[i], metric.tags[ci.SourcesKey]) + } +}