From 9c631d4f77641504cdd267abb6288913e8632227 Mon Sep 17 00:00:00 2001 From: Kaushik Surya Date: Sat, 10 Aug 2024 14:55:21 -0400 Subject: [PATCH 1/3] Move GPU request, limit, usage and reserved_capacity metrics into pod and node stores --- internal/aws/containerinsight/const.go | 14 +- internal/aws/containerinsight/utils_test.go | 50 ++--- internal/aws/k8s/k8sclient/node.go | 53 ++--- internal/aws/k8s/k8sclient/node_info.go | 12 +- internal/aws/k8s/k8sclient/node_test.go | 189 ------------------ internal/aws/k8s/k8sclient/pod.go | 9 - internal/aws/k8s/k8sclient/pod_info.go | 7 - internal/aws/k8s/k8sclient/pod_test.go | 33 +-- internal/aws/k8s/k8sutil/util.go | 9 - internal/aws/k8s/k8sutil/util_test.go | 7 - .../awscontainerinsightreceiver/config.go | 2 +- .../internal/k8sapiserver/k8sapiserver.go | 99 +-------- .../k8sapiserver/k8sapiserver_test.go | 50 +---- .../internal/k8sapiserver/utils.go | 2 - .../internal/stores/nodeinfo.go | 20 +- .../internal/stores/nodeinfo_test.go | 12 ++ .../internal/stores/podstore.go | 93 +++++++-- .../internal/stores/podstore_test.go | 33 ++- .../internal/stores/store.go | 9 +- .../internal/stores/utils.go | 1 + .../internal/stores/utils_test.go | 12 +- .../awscontainerinsightreceiver/receiver.go | 4 +- 22 files changed, 193 insertions(+), 527 deletions(-) diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index 8d796bdb36d0..d4488249005e 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -141,9 +141,10 @@ const ( EfaRxDropped = "rx_dropped" EfaTxBytes = "tx_bytes" - GpuLimit = "gpu_limit" - GpuTotal = "gpu_total" - GpuRequest = "gpu_request" + GpuLimit = "gpu_limit" + GpuUsageTotal = "gpu_usage_total" + GpuRequest = "gpu_request" + GpuReservedCapacity = "gpu_reserved_capacity" // Define the metric types TypeCluster = "Cluster" @@ -325,8 +326,9 @@ func init() { EfaRxDropped: UnitCountPerSec, EfaTxBytes: UnitBytesPerSec, - GpuLimit: UnitCount, - GpuTotal: UnitCount, - GpuRequest: UnitCount, + GpuLimit: UnitCount, + GpuUsageTotal: UnitCount, + GpuRequest: UnitCount, + GpuReservedCapacity: UnitPercent, } } diff --git a/internal/aws/containerinsight/utils_test.go b/internal/aws/containerinsight/utils_test.go index 9ff78aa77a7b..b9f3e878bf26 100644 --- a/internal/aws/containerinsight/utils_test.go +++ b/internal/aws/containerinsight/utils_test.go @@ -431,6 +431,10 @@ func TestConvertToOTLPMetricsForNodeMetrics(t *testing.T) { "node_network_tx_packets": 37.802748111760124, "node_number_of_running_containers": int32(7), "node_number_of_running_pods": int64(7), + "node_gpu_request": int32(7), + "node_gpu_limit": int32(7), + "node_gpu_usage_total": int32(7), + "node_gpu_reserved_capacity": 3.0093851356081194, } expectedUnits = map[string]string{ "node_cpu_limit": "", @@ -467,6 +471,10 @@ func TestConvertToOTLPMetricsForNodeMetrics(t *testing.T) { "node_network_tx_packets": UnitCountPerSec, "node_number_of_running_containers": UnitCount, "node_number_of_running_pods": UnitCount, + "node_gpu_request": UnitCount, + "node_gpu_limit": UnitCount, + "node_gpu_usage_total": UnitCount, + "node_gpu_reserved_capacity": UnitPercent, } tags = map[string]string{ "AutoScalingGroupName": "eks-a6bb9db9-267c-401c-db55-df8ef645b06f", @@ -716,6 +724,10 @@ func TestConvertToOTLPMetricsForPodMetrics(t *testing.T) { "pod_status_unknown": 0, "pod_status_ready": 1, "pod_status_scheduled": 1, + "pod_gpu_request": 1, + "pod_gpu_limit": 1, + "pod_gpu_usage_total": 1, + "pod_gpu_reserved_capacity": 2.3677681271483983, } expectedUnits = map[string]string{ "pod_cpu_limit": "", @@ -762,6 +774,10 @@ func TestConvertToOTLPMetricsForPodMetrics(t *testing.T) { "pod_status_unknown": UnitCount, "pod_status_ready": UnitCount, "pod_status_scheduled": UnitCount, + "pod_gpu_request": UnitCount, + "pod_gpu_limit": UnitCount, + "pod_gpu_usage_total": UnitCount, + "pod_gpu_reserved_capacity": UnitPercent, } tags = map[string]string{ "ClusterName": "eks-aoc", @@ -913,37 +929,3 @@ func TestConvertToOTLPMetricsForPodEfaMetrics(t *testing.T) { md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } - -func TestConvertToOTLPMetricsForAcceleratorCountMetrics(t *testing.T) { - var fields map[string]any - var expectedUnits map[string]string - var tags map[string]string - var md pmetric.Metrics - now := time.Now() - timestamp := strconv.FormatInt(now.UnixNano(), 10) - - fields = map[string]any{ - "pod_gpu_limit": int64(3), - "pod_gpu_total": int64(3), - "pod_gpu_request": int64(3), - } - expectedUnits = map[string]string{ - "pod_gpu_limit": UnitCount, - "pod_gpu_total": UnitCount, - "pod_gpu_request": UnitCount, - } - tags = map[string]string{ - "ClusterName": "eks-aoc", - "InstanceId": "i-01bf9fb097cbf3205", - "InstanceType": "t2.xlarge", - "Namespace": "amazon-cloudwatch", - "NodeName": "ip-192-168-12-170.ec2.internal", - "PodName": "cloudwatch-agent", - "ContainerName": "cloudwatch-agent", - "Type": "PodGPU", - "Version": "0", - "Timestamp": timestamp, - } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) - checkMetricsAreExpected(t, md, fields, tags, expectedUnits) -} diff --git a/internal/aws/k8s/k8sclient/node.go b/internal/aws/k8s/k8sclient/node.go index 494d04855d7d..8682417c590d 100644 --- a/internal/aws/k8s/k8sclient/node.go +++ b/internal/aws/k8s/k8sclient/node.go @@ -18,11 +18,6 @@ import ( "k8s.io/client-go/tools/cache" ) -const ( - instanceTypeLabelKey = "node.kubernetes.io/instance-type" - instanceTypeLabelKeyBeta = "beta.kubernetes.io/instance-type" -) - // This needs to be reviewed for newer versions of k8s. var failedNodeConditions = map[v1.NodeConditionType]bool{ v1.NodeMemoryPressure: true, @@ -32,7 +27,6 @@ var failedNodeConditions = map[v1.NodeConditionType]bool{ } type NodeClient interface { - NodeInfos() map[string]*NodeInfo // Get the number of failed nodes for current cluster ClusterFailedNodeCount() int // Get the number of nodes for current cluster @@ -78,7 +72,6 @@ type nodeClient struct { captureNodeLevelInfo bool mu sync.RWMutex - nodeInfos map[string]*NodeInfo clusterFailedNodeCount int clusterNodeCount int nodeToCapacityMap map[string]v1.ResourceList @@ -86,15 +79,6 @@ type nodeClient struct { nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus } -func (c *nodeClient) NodeInfos() map[string]*NodeInfo { - if c.store.GetResetRefreshStatus() { - c.refresh() - } - c.mu.RLock() - defer c.mu.RUnlock() - return c.nodeInfos -} - func (c *nodeClient) ClusterFailedNodeCount() int { if c.store.GetResetRefreshStatus() { c.refresh() @@ -161,26 +145,24 @@ func (c *nodeClient) refresh() { nodeToAllocatableMap := make(map[string]v1.ResourceList) nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus) - nodeInfos := map[string]*NodeInfo{} for _, obj := range objsList { - node := obj.(*NodeInfo) - nodeInfos[node.Name] = node + node := obj.(*nodeInfo) if c.captureNodeLevelInfo { - nodeToCapacityMap[node.Name] = node.Capacity - nodeToAllocatableMap[node.Name] = node.Allocatable + nodeToCapacityMap[node.name] = node.capacity + nodeToAllocatableMap[node.name] = node.allocatable conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus) - for _, condition := range node.Conditions { + for _, condition := range node.conditions { conditionsMap[condition.Type] = condition.Status } - nodeToConditionsMap[node.Name] = conditionsMap + nodeToConditionsMap[node.name] = conditionsMap } clusterNodeCountNew++ failed := false Loop: - for _, condition := range node.Conditions { + for _, condition := range node.conditions { if _, ok := failedNodeConditions[condition.Type]; ok { // match the failedNodeConditions type we care about if condition.Status != v1.ConditionFalse { @@ -196,7 +178,6 @@ func (c *nodeClient) refresh() { } } - c.nodeInfos = nodeInfos c.clusterFailedNodeCount = clusterFailedNodeCountNew c.clusterNodeCount = clusterNodeCountNew c.nodeToCapacityMap = nodeToCapacityMap @@ -241,23 +222,13 @@ func transformFuncNode(obj any) (any, error) { if !ok { return nil, fmt.Errorf("input obj %v is not Node type", obj) } - info := new(NodeInfo) - info.Name = node.Name - info.Capacity = node.Status.Capacity - info.Allocatable = node.Status.Allocatable - info.Conditions = []*NodeCondition{} - info.ProviderID = node.Spec.ProviderID - if instanceType, ok := node.Labels[instanceTypeLabelKey]; ok { - info.InstanceType = instanceType - } else { - // fallback for compatibility with k8s versions older than v1.17 - // https://kubernetes.io/docs/reference/labels-annotations-taints/#beta-kubernetes-io-instance-type-deprecated - if instanceType, ok := node.Labels[instanceTypeLabelKeyBeta]; ok { - info.InstanceType = instanceType - } - } + info := new(nodeInfo) + info.name = node.Name + info.capacity = node.Status.Capacity + info.allocatable = node.Status.Allocatable + info.conditions = []*NodeCondition{} for _, condition := range node.Status.Conditions { - info.Conditions = append(info.Conditions, &NodeCondition{ + info.conditions = append(info.conditions, &NodeCondition{ Type: condition.Type, Status: condition.Status, }) diff --git a/internal/aws/k8s/k8sclient/node_info.go b/internal/aws/k8s/k8sclient/node_info.go index a9ae305a8cf9..6b1462adadcd 100644 --- a/internal/aws/k8s/k8sclient/node_info.go +++ b/internal/aws/k8s/k8sclient/node_info.go @@ -7,13 +7,11 @@ import ( v1 "k8s.io/api/core/v1" ) -type NodeInfo struct { - Name string - Conditions []*NodeCondition - Capacity v1.ResourceList - Allocatable v1.ResourceList - ProviderID string - InstanceType string +type nodeInfo struct { + name string + conditions []*NodeCondition + capacity v1.ResourceList + allocatable v1.ResourceList } type NodeCondition struct { diff --git a/internal/aws/k8s/k8sclient/node_test.go b/internal/aws/k8s/k8sclient/node_test.go index 476af8c2422f..9c0213001f44 100644 --- a/internal/aws/k8s/k8sclient/node_test.go +++ b/internal/aws/k8s/k8sclient/node_test.go @@ -36,7 +36,6 @@ var nodeArray = []any{ "failure-domain.beta.kubernetes.io/region": "eu-west-1", "failure-domain.beta.kubernetes.io/zone": "eu-west-1c", "kubernetes.io/hostname": "ip-192-168-200-63.eu-west-1.compute.internal", - "node.kubernetes.io/instance-type": "t3.medium", }, Annotations: map[string]string{ "node.alpha.kubernetes.io/ttl": "0", @@ -113,9 +112,6 @@ var nodeArray = []any{ Architecture: "amd64", }, }, - Spec: v1.NodeSpec{ - ProviderID: "aws:///eu-west-1c/i-09087f37a14b9ded1", - }, }, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -213,9 +209,6 @@ var nodeArray = []any{ Architecture: "amd64", }, }, - Spec: v1.NodeSpec{ - ProviderID: "aws:///eu-west-1a/i-09087f37a14b9ded2", - }, }, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -312,9 +305,6 @@ var nodeArray = []any{ Architecture: "amd64", }, }, - Spec: v1.NodeSpec{ - ProviderID: "aws:///eu-west-1b/i-09087f37a14b9ded3", - }, }, } @@ -333,95 +323,6 @@ func TestNodeClient(t *testing.T) { "nodeToCapacityMap": map[string]v1.ResourceList{}, // Node level info is not captured by default "nodeToAllocatableMap": map[string]v1.ResourceList{}, // Node level info is not captured by default "nodeToConditionsMap": map[string]map[v1.NodeConditionType]v1.ConditionStatus{}, // Node level info is not captured by default - "nodeInfos": map[string]*NodeInfo{ - "ip-192-168-200-63.eu-west-1.compute.internal": { - Name: "ip-192-168-200-63.eu-west-1.compute.internal", - Conditions: []*NodeCondition{ - { - Type: v1.NodeConditionType("MemoryPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("DiskPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("PIDPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("Ready"), - Status: v1.ConditionTrue, - }, - }, - Allocatable: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), - }, - Capacity: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), - }, - ProviderID: "aws:///eu-west-1c/i-09087f37a14b9ded1", - InstanceType: "t3.medium", - }, - "ip-192-168-76-61.eu-west-1.compute.internal": { - Name: "ip-192-168-76-61.eu-west-1.compute.internal", - Conditions: []*NodeCondition{ - { - Type: v1.NodeConditionType("MemoryPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("DiskPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("PIDPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("Ready"), - Status: v1.ConditionTrue, - }, - }, - Allocatable: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, - Capacity: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, - ProviderID: "aws:///eu-west-1a/i-09087f37a14b9ded2", - InstanceType: "t3.medium", - }, - "ip-192-168-153-1.eu-west-1.compute.internal": { - Name: "ip-192-168-153-1.eu-west-1.compute.internal", - Conditions: []*NodeCondition{ - { - Type: v1.NodeConditionType("MemoryPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("DiskPressure"), - Status: v1.ConditionTrue, - }, - { - Type: v1.NodeConditionType("PIDPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("Ready"), - Status: v1.ConditionFalse, - }, - }, - Allocatable: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), - }, - Capacity: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), - }, - ProviderID: "aws:///eu-west-1b/i-09087f37a14b9ded3", - InstanceType: "t3.medium", - }, - }, }, }, "CaptureNodeLevelInfo": { @@ -474,95 +375,6 @@ func TestNodeClient(t *testing.T) { "Ready": "False", }, }, - "nodeInfos": map[string]*NodeInfo{ - "ip-192-168-200-63.eu-west-1.compute.internal": { - Name: "ip-192-168-200-63.eu-west-1.compute.internal", - Conditions: []*NodeCondition{ - { - Type: v1.NodeConditionType("MemoryPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("DiskPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("PIDPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("Ready"), - Status: v1.ConditionTrue, - }, - }, - Allocatable: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), - }, - Capacity: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), - }, - ProviderID: "aws:///eu-west-1c/i-09087f37a14b9ded1", - InstanceType: "t3.medium", - }, - "ip-192-168-76-61.eu-west-1.compute.internal": { - Name: "ip-192-168-76-61.eu-west-1.compute.internal", - Conditions: []*NodeCondition{ - { - Type: v1.NodeConditionType("MemoryPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("DiskPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("PIDPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("Ready"), - Status: v1.ConditionTrue, - }, - }, - Allocatable: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, - Capacity: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, - ProviderID: "aws:///eu-west-1a/i-09087f37a14b9ded2", - InstanceType: "t3.medium", - }, - "ip-192-168-153-1.eu-west-1.compute.internal": { - Name: "ip-192-168-153-1.eu-west-1.compute.internal", - Conditions: []*NodeCondition{ - { - Type: v1.NodeConditionType("MemoryPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("DiskPressure"), - Status: v1.ConditionTrue, - }, - { - Type: v1.NodeConditionType("PIDPressure"), - Status: v1.ConditionFalse, - }, - { - Type: v1.NodeConditionType("Ready"), - Status: v1.ConditionFalse, - }, - }, - Allocatable: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), - }, - Capacity: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), - }, - ProviderID: "aws:///eu-west-1b/i-09087f37a14b9ded3", - InstanceType: "t3.medium", - }, - }, }, }, } @@ -577,7 +389,6 @@ func TestNodeClient(t *testing.T) { require.Equal(t, testCase.want["nodeToCapacityMap"], client.NodeToCapacityMap()) require.Equal(t, testCase.want["nodeToAllocatableMap"], client.NodeToAllocatableMap()) require.Equal(t, testCase.want["nodeToConditionsMap"], client.NodeToConditionsMap()) - require.EqualValues(t, testCase.want["nodeInfos"], client.NodeInfos()) client.shutdown() assert.True(t, client.stopped) diff --git a/internal/aws/k8s/k8sclient/pod.go b/internal/aws/k8s/k8sclient/pod.go index 4f82aac7c106..626c9d77a193 100644 --- a/internal/aws/k8s/k8sclient/pod.go +++ b/internal/aws/k8s/k8sclient/pod.go @@ -128,15 +128,6 @@ func transformFuncPod(obj any) (any, error) { info.OwnerReferences = pod.OwnerReferences info.Phase = pod.Status.Phase info.Conditions = pod.Status.Conditions - containerInfos := make([]*ContainerInfo, 0) - for _, container := range pod.Spec.Containers { - containerInfos = append(containerInfos, &ContainerInfo{ - Name: container.Name, - Resources: container.Resources, - }) - } - info.Containers = containerInfos - info.NodeName = pod.Spec.NodeName return info, nil } diff --git a/internal/aws/k8s/k8sclient/pod_info.go b/internal/aws/k8s/k8sclient/pod_info.go index a9dfe6a3ce2c..14e3eee13008 100644 --- a/internal/aws/k8s/k8sclient/pod_info.go +++ b/internal/aws/k8s/k8sclient/pod_info.go @@ -16,11 +16,4 @@ type PodInfo struct { OwnerReferences []metaV1.OwnerReference Phase v1.PodPhase Conditions []v1.PodCondition - NodeName string - Containers []*ContainerInfo -} - -type ContainerInfo struct { - Name string - Resources v1.ResourceRequirements } diff --git a/internal/aws/k8s/k8sclient/pod_test.go b/internal/aws/k8s/k8sclient/pod_test.go index 2710ab34d459..38f085b5df64 100644 --- a/internal/aws/k8s/k8sclient/pod_test.go +++ b/internal/aws/k8s/k8sclient/pod_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" ) @@ -211,22 +210,6 @@ func TestPodClient_PodInfos(t *testing.T) { Status: v1.PodStatus{ Phase: "Running", }, - Spec: v1.PodSpec{ - NodeName: "node-1", - Containers: []v1.Container{ - { - Name: "container-1", - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - "nvidia.com/gpu": resource.MustParse("1"), - }, - Requests: v1.ResourceList{ - "nvidia.com/gpu": resource.MustParse("1"), - }, - }, - }, - }, - }, }, } @@ -243,21 +226,7 @@ func TestPodClient_PodInfos(t *testing.T) { Labels: map[string]string{ "key": "value", }, - Phase: v1.PodRunning, - NodeName: "node-1", - Containers: []*ContainerInfo{ - { - Name: "container-1", - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - "nvidia.com/gpu": resource.MustParse("1"), - }, - Requests: v1.ResourceList{ - "nvidia.com/gpu": resource.MustParse("1"), - }, - }, - }, - }, + Phase: v1.PodRunning, }, } diff --git a/internal/aws/k8s/k8sutil/util.go b/internal/aws/k8s/k8sutil/util.go index 74378eb9bbc1..2264446eacfb 100644 --- a/internal/aws/k8s/k8sutil/util.go +++ b/internal/aws/k8s/k8sutil/util.go @@ -5,7 +5,6 @@ package k8sutil // import "github.com/open-telemetry/opentelemetry-collector-con import ( "fmt" - "strings" ) // CreatePodKey concatenates namespace and podName to get a pod key @@ -23,11 +22,3 @@ func CreateContainerKey(namespace, podName, containerName string) string { } return fmt.Sprintf("namespace:%s,podName:%s,containerName:%s", namespace, podName, containerName) } - -// ParseInstanceIDFromProviderID parses EC2 instance id from node's provider id which has format of aws://// -func ParseInstanceIDFromProviderID(providerID string) string { - if providerID == "" || !strings.HasPrefix(providerID, "aws://") { - return "" - } - return providerID[strings.LastIndex(providerID, "/")+1:] -} diff --git a/internal/aws/k8s/k8sutil/util_test.go b/internal/aws/k8s/k8sutil/util_test.go index 660d477e5b3d..16734f8513bc 100644 --- a/internal/aws/k8s/k8sutil/util_test.go +++ b/internal/aws/k8s/k8sutil/util_test.go @@ -22,10 +22,3 @@ func TestCreateContainerKey(t *testing.T) { assert.Equal(t, "", CreateContainerKey("default", "", "testContainer")) assert.Equal(t, "", CreateContainerKey("default", "testPod", "")) } - -func TestParseInstanceIdFromProviderId(t *testing.T) { - assert.Equal(t, "i-0b00e07ccd388f915", ParseInstanceIDFromProviderID("aws:///us-west-2b/i-0b00e07ccd388f915")) - assert.Equal(t, "i-0b00e07ccd388f915", ParseInstanceIDFromProviderID("aws:///us-east-1c/i-0b00e07ccd388f915")) - assert.Equal(t, "", ParseInstanceIDFromProviderID(":///us-east-1c/i-0b00e07ccd388f915")) - assert.Equal(t, "", ParseInstanceIDFromProviderID("")) -} diff --git a/receiver/awscontainerinsightreceiver/config.go b/receiver/awscontainerinsightreceiver/config.go index 4a2ae21c1ef6..45a3b5bb9baa 100644 --- a/receiver/awscontainerinsightreceiver/config.go +++ b/receiver/awscontainerinsightreceiver/config.go @@ -58,7 +58,7 @@ type Config struct { // The default value is false. EnableControlPlaneMetrics bool `mapstructure:"enable_control_plane_metrics"` - // EnableAcceleratedComputeMetrics enabled features with accelerated compute resources where metrics are scraped from vendor specific sources + // EnableAcceleratedComputeMetrics enables features with accelerated compute resources where metrics are scraped from vendor specific sources EnableAcceleratedComputeMetrics bool `mapstructure:"accelerated_compute_metrics"` // KubeConfigPath is an optional attribute to override the default kube config path in an EC2 environment diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index ecf07cb222ac..cc9a17ef801d 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -62,7 +61,6 @@ type K8sAPIServer struct { leaderElection *LeaderElection addFullPodNameMetricLabel bool includeEnhancedMetrics bool - enableAcceleratedMetrics bool } type clusterNameProvider interface { @@ -72,7 +70,7 @@ type clusterNameProvider interface { type Option func(*K8sAPIServer) // NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics -func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, enableAcceleratedMetrics bool, options ...Option) (*K8sAPIServer, error) { +func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, options ...Option) (*K8sAPIServer, error) { k := &K8sAPIServer{ logger: logger, @@ -80,7 +78,6 @@ func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection leaderElection: leaderElection, addFullPodNameMetricLabel: addFullPodNameMetricLabel, includeEnhancedMetrics: includeEnhancedMetrics, - enableAcceleratedMetrics: enableAcceleratedMetrics, } for _, opt := range options { @@ -128,9 +125,6 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { result = append(result, k.getStatefulSetMetrics(clusterName, timestampNs)...) result = append(result, k.getReplicaSetMetrics(clusterName, timestampNs)...) result = append(result, k.getPendingPodStatusMetrics(clusterName, timestampNs)...) - if k.enableAcceleratedMetrics { - result = append(result, k.getAcceleratorCountMetrics(clusterName, timestampNs)...) - } return result } @@ -357,7 +351,7 @@ func (k *K8sAPIServer) getPendingPodStatusMetrics(clusterName, timestampNs strin } attributes[ci.PodStatus] = string(v1.PodPending) - attributes["k8s.node.name"] = pendingNodeName + attributes["k8s.node.name"] = "pending" kubernetesBlob := map[string]any{} k.getKubernetesBlob(podInfo, kubernetesBlob, attributes) @@ -448,95 +442,6 @@ func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob } } -func (k *K8sAPIServer) getAcceleratorCountMetrics(clusterName, timestampNs string) []pmetric.Metrics { - var metrics []pmetric.Metrics - podsList := k.leaderElection.podClient.PodInfos() - nodeInfos := k.leaderElection.nodeClient.NodeInfos() - podKeyToServiceNamesMap := k.leaderElection.epClient.PodKeyToServiceNames() - for _, podInfo := range podsList { - // only care for pending and running pods - if podInfo.Phase != v1.PodPending && podInfo.Phase != v1.PodRunning { - continue - } - - fields := map[string]any{} - - var podLimit, podRequest, podTotal int64 - var gpuAllocated bool - for _, container := range podInfo.Containers { - // check if at least 1 container is using gpu to add count metrics for a pod - _, found := container.Resources.Limits[resourceSpecNvidiaGpuKey] - gpuAllocated = gpuAllocated || found - - if len(container.Resources.Limits) == 0 { - continue - } - podRequest += container.Resources.Requests.Name(resourceSpecNvidiaGpuKey, resource.DecimalExponent).Value() - limit := container.Resources.Limits.Name(resourceSpecNvidiaGpuKey, resource.DecimalExponent).Value() - // still counting pending pods to get total # of limit from spec - podLimit += limit - - // sum of running pods only. a pod will be stuck in pending state when there is less or no gpu available than limit value - // e.g. limit=2,available=1 - *_gpu_limit=2, *_gpu_request=2, *_gpu_total=0 - // request value is optional and must be equal to limit https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/ - if podInfo.Phase == v1.PodRunning { - podTotal += limit - } - } - // skip adding gpu count metrics when none of containers has gpu resource allocated - if !gpuAllocated { - continue - } - // add pod level count metrics here then metricstransformprocessor will duplicate to node/cluster level metrics - fields[ci.MetricName(ci.TypePod, ci.GpuLimit)] = podLimit - fields[ci.MetricName(ci.TypePod, ci.GpuRequest)] = podRequest - fields[ci.MetricName(ci.TypePod, ci.GpuTotal)] = podTotal - - attributes := map[string]string{ - ci.ClusterNameKey: clusterName, - ci.MetricType: ci.TypePodGPU, - ci.Timestamp: timestampNs, - ci.AttributeK8sNamespace: podInfo.Namespace, - ci.Version: "0", - } - - podKey := k8sutil.CreatePodKey(podInfo.Namespace, podInfo.Name) - if serviceList, ok := podKeyToServiceNamesMap[podKey]; ok { - if len(serviceList) > 0 { - attributes[ci.TypeService] = serviceList[0] - } - } - - kubernetesBlob := map[string]any{} - k.getKubernetesBlob(podInfo, kubernetesBlob, attributes) - if podInfo.NodeName != "" { - // decorate with instance ID and type attributes which become dimensions for node_gpu_* metrics - attributes[ci.NodeNameKey] = podInfo.NodeName - kubernetesBlob["host"] = podInfo.NodeName - if nodeInfo, ok := nodeInfos[podInfo.NodeName]; ok { - attributes[ci.InstanceID] = k8sutil.ParseInstanceIDFromProviderID(nodeInfo.ProviderID) - attributes[ci.InstanceType] = nodeInfo.InstanceType - } - } else { - // fallback when node name is not available - attributes[ci.NodeNameKey] = pendingNodeName - kubernetesBlob["host"] = pendingNodeName - } - if len(kubernetesBlob) > 0 { - kubernetesInfo, err := json.Marshal(kubernetesBlob) - if err != nil { - k.logger.Warn("Error parsing kubernetes blob for pod metrics") - } else { - attributes[ci.AttributeKubernetes] = string(kubernetesInfo) - } - } - attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) - metrics = append(metrics, md) - } - return metrics -} - // Shutdown stops the k8sApiServer func (k *K8sAPIServer) Shutdown() error { if k.cancel != nil { diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index 8f39652e4f3c..d4d27fbaea00 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -139,12 +138,6 @@ func (client *MockClient) PodInfos() []*k8sclient.PodInfo { return args.Get(0).([]*k8sclient.PodInfo) } -// k8sclient.NodeClient -func (client *MockClient) NodeInfos() map[string]*k8sclient.NodeInfo { - args := client.Called() - return args.Get(0).(map[string]*k8sclient.NodeInfo) -} - // k8sclient.NodeClient func (client *MockClient) ClusterFailedNodeCount() int { args := client.Called() @@ -231,7 +224,7 @@ func (m mockClusterNameProvider) GetClusterName() string { } func TestK8sAPIServer_New(t *testing.T) { - k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil, false, false, false) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil, false, false) assert.Nil(t, k8sAPIServer) assert.NotNil(t, err) } @@ -311,43 +304,11 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", Phase: v1.PodPending, }, - { - Name: "gpu-burn-6dcbd994fb-9fn8w", - Namespace: "amazon-cloudwatch", - NodeName: "ip-192-168-57-23.us-west-2.compute.internal", - UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", - Phase: v1.PodRunning, - Containers: []*k8sclient.ContainerInfo{ - { - Name: "container-1", - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - resourceSpecNvidiaGpuKey: resource.MustParse("2"), - }, - }, - }, - }, - }, }) mockClient.On("PodKeyToServiceNames").Return(map[string][]string{ "namespace:kube-system,podName:coredns-7554568866-26jdf": {"kube-dns"}, "namespace:kube-system,podName:coredns-7554568866-shwn6": {"kube-dns"}, }) - mockClient.On("NodeInfos").Return(map[string]*k8sclient.NodeInfo{ - "ip-192-168-57-23.us-west-2.compute.internal": { - Name: "ip-192-168-57-23.us-west-2.compute.internal", - Conditions: []*k8sclient.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionTrue, - }, - }, - Capacity: map[v1.ResourceName]resource.Quantity{}, - Allocatable: map[v1.ResourceName]resource.Quantity{}, - ProviderID: "aws:///us-west-2/i-abcdef123456789", - InstanceType: "g4dn-12xl", - }, - }) leaderElection := &LeaderElection{ k8sClient: &mockK8sClient{}, @@ -365,7 +326,7 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { t.Setenv("HOST_NAME", hostName) t.Setenv("K8S_NAMESPACE", "namespace") - k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection, true, true, true) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection, true, true) assert.NotNil(t, k8sAPIServer) assert.Nil(t, err) @@ -440,13 +401,6 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { assert.Equal(t, "kube-system", getStringAttrVal(metric, ci.AttributeK8sNamespace)) assert.Equal(t, "Pending", getStringAttrVal(metric, "pod_status")) assert.Equal(t, "Pod", getStringAttrVal(metric, ci.MetricType)) - case ci.TypePodGPU: - assertMetricValueEqual(t, metric, "pod_gpu_limit", int64(2)) - assertMetricValueEqual(t, metric, "pod_gpu_total", int64(2)) - assert.Equal(t, "amazon-cloudwatch", getStringAttrVal(metric, ci.AttributeK8sNamespace)) - assert.Equal(t, ci.TypePodGPU, getStringAttrVal(metric, ci.MetricType)) - assert.Equal(t, "i-abcdef123456789", getStringAttrVal(metric, ci.InstanceID)) - assert.Equal(t, "g4dn-12xl", getStringAttrVal(metric, ci.InstanceType)) default: assert.Fail(t, "Unexpected metric type: "+metricType) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go index c6a6293630fd..a2efa312805d 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go @@ -19,8 +19,6 @@ const ( splitRegexStr = "\\.|-" KubeProxy = "kube-proxy" cronJobAllowedString = "0123456789" - resourceSpecNvidiaGpuKey = "nvidia.com/gpu" - pendingNodeName = "pending" ) var ( diff --git a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go index a05e7b90bae5..34abbcb19054 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go @@ -9,13 +9,16 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" ) type nodeStats struct { - podCnt int - containerCnt int - cpuReq uint64 - memReq uint64 + podCnt int + containerCnt int + cpuReq uint64 + memReq uint64 + gpuReq uint64 + gpuUsageTotal uint64 } type nodeInfo struct { @@ -103,6 +106,15 @@ func (n *nodeInfo) getNodeStatusAllocatablePods() (uint64, bool) { return forceConvertToInt64(pods, n.logger), true } +func (n *nodeInfo) getNodeStatusCapacityGPUs() (uint64, bool) { + capacityResources, ok := n.provider.NodeToCapacityMap()[n.nodeName] + if !ok { + return 0, false + } + gpus := capacityResources.Name(resourceSpecNvidiaGpuKey, resource.DecimalExponent).Value() + return forceConvertToInt64(gpus, n.logger), true +} + func (n *nodeInfo) getNodeStatusCondition(conditionType v1.NodeConditionType) (uint64, bool) { if nodeConditions, ok := n.provider.NodeToConditionsMap()[n.nodeName]; ok { if conditionStatus, ok := nodeConditions[conditionType]; ok { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go index e3119dc07698..9261b49128a6 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go @@ -97,6 +97,18 @@ func TestGetNodeStatusAllocatablePods(t *testing.T) { assert.Equal(t, uint64(0), nodeStatusAllocatablePods) } +func TestGetNodeStatusCapacityGPUs(t *testing.T) { + nodeInfo := newNodeInfo("testNode1", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusCapacityGPUs, valid := nodeInfo.getNodeStatusCapacityGPUs() + assert.True(t, valid) + assert.Equal(t, uint64(20), nodeStatusCapacityGPUs) + + nodeInfo = newNodeInfo("testNodeNonExistent", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusCapacityGPUs, valid = nodeInfo.getNodeStatusAllocatablePods() + assert.False(t, valid) + assert.Equal(t, uint64(0), nodeStatusCapacityGPUs) +} + func TestGetNodeStatusCondition(t *testing.T) { nodeInfo := newNodeInfo("testNode1", &mockNodeInfoProvider{}, zap.NewNop()) nodeStatusCondition, valid := nodeInfo.getNodeStatusCondition(v1.NodeReady) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index 8247601cfc6d..fd1abe03cd5d 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -28,6 +28,7 @@ const ( podsExpiry = 2 * time.Minute memoryKey = "memory" cpuKey = "cpu" + gpuKey = "nvidia.com/gpu" splitRegexStr = "\\.|-" kubeProxy = "kube-proxy" ) @@ -109,19 +110,19 @@ type podClient interface { type PodStore struct { cache *mapWithExpiry // prevMeasurements per each Type (Pod, Container, etc) - prevMeasurements sync.Map // map[string]*mapWithExpiry - podClient podClient - k8sClient replicaSetInfoProvider - lastRefreshed time.Time - nodeInfo *nodeInfo - prefFullPodName bool - logger *zap.Logger - addFullPodNameMetricLabel bool - includeEnhancedMetrics bool -} - -func NewPodStore(client podClient, prefFullPodName bool, addFullPodNameMetricLabel bool, - includeEnhancedMetrics bool, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) { + prevMeasurements sync.Map // map[string]*mapWithExpiry + podClient podClient + k8sClient replicaSetInfoProvider + lastRefreshed time.Time + nodeInfo *nodeInfo + prefFullPodName bool + logger *zap.Logger + addFullPodNameMetricLabel bool + includeEnhancedMetrics bool + enableAcceleratedComputeMetrics bool +} + +func NewPodStore(client podClient, prefFullPodName bool, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, enableAcceleratedComputeMetrics bool, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) { if hostName == "" { return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName) } @@ -147,13 +148,14 @@ func NewPodStore(client podClient, prefFullPodName bool, addFullPodNameMetricLab cache: newMapWithExpiry(podsExpiry), prevMeasurements: sync.Map{}, //prevMeasurements: make(map[string]*mapWithExpiry), - podClient: client, - nodeInfo: nodeInfo, - prefFullPodName: prefFullPodName, - includeEnhancedMetrics: includeEnhancedMetrics, - k8sClient: k8sClient, - logger: logger, - addFullPodNameMetricLabel: addFullPodNameMetricLabel, + podClient: client, + nodeInfo: nodeInfo, + prefFullPodName: prefFullPodName, + includeEnhancedMetrics: includeEnhancedMetrics, + enableAcceleratedComputeMetrics: enableAcceleratedComputeMetrics, + k8sClient: k8sClient, + logger: logger, + addFullPodNameMetricLabel: addFullPodNameMetricLabel, } return podStore, nil @@ -239,6 +241,7 @@ func (p *PodStore) Decorate(ctx context.Context, metric CIMetric, kubernetesBlob if entry.pod.Name != "" { p.decorateCPU(metric, &entry.pod) p.decorateMem(metric, &entry.pod) + p.decorateGPU(metric, &entry.pod) p.addStatus(metric, &entry.pod) addContainerCount(metric, &entry.pod) addContainerID(&entry.pod, metric, kubernetesBlob, p.logger) @@ -292,6 +295,8 @@ func (p *PodStore) refreshInternal(now time.Time, podList []corev1.Pod) { var containerCount int var cpuRequest uint64 var memRequest uint64 + var gpuRequest uint64 + var gpuUsageTotal uint64 for i := range podList { pod := podList[i] @@ -306,6 +311,12 @@ func (p *PodStore) refreshInternal(now time.Time, podList []corev1.Pod) { cpuRequest += tmpCPUReq tmpMemReq, _ := getResourceSettingForPod(&pod, p.nodeInfo.getMemCapacity(), memoryKey, getRequestForContainer) memRequest += tmpMemReq + if tmpGpuReq, ok := getResourceSettingForPod(&pod, 0, gpuKey, getRequestForContainer); ok { + gpuRequest += tmpGpuReq + } + if tmpGpuLimit, ok := getResourceSettingForPod(&pod, 0, gpuKey, getLimitForContainer); ok && pod.Status.Phase == corev1.PodRunning { + gpuUsageTotal += tmpGpuLimit + } } if pod.Status.Phase == corev1.PodRunning { podCount++ @@ -319,10 +330,18 @@ func (p *PodStore) refreshInternal(now time.Time, podList []corev1.Pod) { p.setCachedEntry(podKey, &cachedEntry{ pod: pod, - creation: now}) + creation: now, + }) } - p.nodeInfo.setNodeStats(nodeStats{podCnt: podCount, containerCnt: containerCount, memReq: memRequest, cpuReq: cpuRequest}) + p.nodeInfo.setNodeStats(nodeStats{ + podCnt: podCount, + containerCnt: containerCount, + memReq: memRequest, + cpuReq: cpuRequest, + gpuReq: gpuRequest, + gpuUsageTotal: gpuUsageTotal, + }) } func (p *PodStore) decorateNode(metric CIMetric) { @@ -380,6 +399,36 @@ func (p *PodStore) decorateNode(metric CIMetric) { if nodeStatusConditionUnknown, ok := p.nodeInfo.getNodeConditionUnknown(); ok { metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusConditionUnknown), nodeStatusConditionUnknown) } + if p.enableAcceleratedComputeMetrics { + if nodeStatusCapacityGPUs, ok := p.nodeInfo.getNodeStatusCapacityGPUs(); ok && nodeStatusCapacityGPUs != 0 { + metric.AddField(ci.MetricName(ci.TypeNode, ci.GpuRequest), nodeStats.gpuReq) + metric.AddField(ci.MetricName(ci.TypeNode, ci.GpuLimit), nodeStatusCapacityGPUs) + metric.AddField(ci.MetricName(ci.TypeNode, ci.GpuUsageTotal), nodeStats.gpuUsageTotal) + metric.AddField(ci.MetricName(ci.TypeNode, ci.GpuReservedCapacity), float64(nodeStats.gpuReq)/float64(nodeStatusCapacityGPUs)*100) + } + } + } +} + +func (p *PodStore) decorateGPU(metric CIMetric, pod *corev1.Pod) { + if p.includeEnhancedMetrics && p.enableAcceleratedComputeMetrics && metric.GetTag(ci.MetricType) == ci.TypePod && + pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed { + + if podGpuRequest, ok := getResourceSettingForPod(pod, 0, gpuKey, getRequestForContainer); ok { + metric.AddField(ci.MetricName(ci.TypePod, ci.GpuRequest), podGpuRequest) + } + + if podGpuLimit, ok := getResourceSettingForPod(pod, 0, gpuKey, getLimitForContainer); ok { + metric.AddField(ci.MetricName(ci.TypePod, ci.GpuLimit), podGpuLimit) + var podGpuUsageTotal uint64 + if pod.Status.Phase == corev1.PodRunning { // Set the GPU limit as the usage_total for running pods only + podGpuUsageTotal = podGpuLimit + } + metric.AddField(ci.MetricName(ci.TypePod, ci.GpuUsageTotal), podGpuUsageTotal) + if nodeStatusCapacityGPUs, ok := p.nodeInfo.getNodeStatusCapacityGPUs(); ok && nodeStatusCapacityGPUs != 0 { + metric.AddField(ci.MetricName(ci.TypePod, ci.GpuReservedCapacity), float64(podGpuLimit)/float64(nodeStatusCapacityGPUs)*100) + } + } } } diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go index 3610ce4fa923..ffdbfbc3c99c 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go @@ -81,11 +81,13 @@ func getBaseTestPodInfo() *corev1.Pod { "resources": { "limits": { "cpu": "10m", - "memory": "50Mi" + "memory": "50Mi", + "nvidia.com/gpu": "1" }, "requests": { "cpu": "10m", - "memory": "50Mi" + "memory": "50Mi", + "nvidia.com/gpu": "1" } }, "volumeMounts": [ @@ -281,6 +283,27 @@ func TestPodStore_decorateMem(t *testing.T) { assert.Equal(t, float64(20), metric.GetField("container_memory_utilization_over_container_limit").(float64)) } +func TestPodStore_decorateGpu(t *testing.T) { + podStore := getPodStore() + defer require.NoError(t, podStore.Shutdown()) + + pod := getBaseTestPodInfo() + + // test pod metrics + tags := map[string]string{ci.MetricType: ci.TypePod} + fields := map[string]any{} + + metric := generateMetric(fields, tags) + podStore.includeEnhancedMetrics = true + podStore.enableAcceleratedComputeMetrics = true + podStore.decorateGPU(metric, pod) + + assert.Equal(t, uint64(1), metric.GetField("pod_gpu_request").(uint64)) + assert.Equal(t, uint64(1), metric.GetField("pod_gpu_limit").(uint64)) + assert.Equal(t, uint64(1), metric.GetField("pod_gpu_usage_total").(uint64)) + assert.Equal(t, float64(5), metric.GetField("pod_gpu_reserved_capacity").(float64)) +} + func TestPodStore_previousCleanupLocking(_ *testing.T) { podStore := getPodStore() podStore.podClient = &mockPodClient{} @@ -956,8 +979,14 @@ func TestPodStore_decorateNode(t *testing.T) { assert.False(t, metric.HasField("node_status_allocatable_pods")) podStore.includeEnhancedMetrics = true + podStore.enableAcceleratedComputeMetrics = true podStore.decorateNode(metric) + assert.Equal(t, uint64(1), metric.GetField("node_gpu_request").(uint64)) + assert.Equal(t, uint64(20), metric.GetField("node_gpu_limit").(uint64)) + assert.Equal(t, uint64(1), metric.GetField("node_gpu_usage_total").(uint64)) + assert.Equal(t, float64(5), metric.GetField("node_gpu_reserved_capacity").(float64)) + assert.Equal(t, uint64(1), metric.GetField("node_status_condition_ready").(uint64)) assert.Equal(t, uint64(0), metric.GetField("node_status_condition_disk_pressure").(uint64)) assert.Equal(t, uint64(0), metric.GetField("node_status_condition_memory_pressure").(uint64)) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/store.go b/receiver/awscontainerinsightreceiver/internal/stores/store.go index 457a08ddbf72..51f3dfdcaa5c 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/store.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/store.go @@ -43,15 +43,16 @@ type K8sDecorator struct { podStore *PodStore } -func NewK8sDecorator(ctx context.Context, kubeletClient *kubeletutil.KubeletClient, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool, - addContainerNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, - hostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) { +func NewK8sDecorator(ctx context.Context, kubeletClient *kubeletutil.KubeletClient, tagService bool, prefFullPodName bool, + addFullPodNameMetricLabel bool, addContainerNameMetricLabel bool, includeEnhancedMetrics bool, enableAcceleratedComputeMetrics bool, + kubeConfigPath string, hostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) { + k := &K8sDecorator{ ctx: ctx, addContainerNameMetricLabel: addContainerNameMetricLabel, } - podstore, err := NewPodStore(kubeletClient, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, hostName, isSystemd, logger) + podstore, err := NewPodStore(kubeletClient, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, enableAcceleratedComputeMetrics, hostName, isSystemd, logger) if err != nil { return nil, err } diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils.go b/receiver/awscontainerinsightreceiver/internal/stores/utils.go index 4f60400ce5a4..b4f17627d5d0 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils.go @@ -22,6 +22,7 @@ const ( // https://github.com/kubernetes/apimachinery/blob/master/pkg/util/rand/rand.go#L83 kubeAllowedStringAlphaNums = "bcdfghjklmnpqrstvwxz2456789" cronJobAllowedString = "0123456789" + resourceSpecNvidiaGpuKey = "nvidia.com/gpu" ) func createPodKeyFromMetaData(pod *corev1.Pod) string { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go index b78269eb5330..715aad47b771 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go @@ -66,10 +66,12 @@ type mockNodeInfoProvider struct { func (m *mockNodeInfoProvider) NodeToCapacityMap() map[string]v1.ResourceList { return map[string]v1.ResourceList{ "testNode1": { - "pods": *resource.NewQuantity(5, resource.DecimalSI), + "pods": *resource.NewQuantity(5, resource.DecimalSI), + "nvidia.com/gpu": *resource.NewQuantity(20, resource.DecimalExponent), }, "testNode2": { - "pods": *resource.NewQuantity(10, resource.DecimalSI), + "pods": *resource.NewQuantity(10, resource.DecimalSI), + "nvidia.com/gpu": *resource.NewQuantity(30, resource.DecimalExponent), }, } } @@ -77,10 +79,12 @@ func (m *mockNodeInfoProvider) NodeToCapacityMap() map[string]v1.ResourceList { func (m *mockNodeInfoProvider) NodeToAllocatableMap() map[string]v1.ResourceList { return map[string]v1.ResourceList{ "testNode1": { - "pods": *resource.NewQuantity(15, resource.DecimalSI), + "pods": *resource.NewQuantity(15, resource.DecimalSI), + "nvidia.com/gpu": *resource.NewQuantity(20, resource.DecimalExponent), }, "testNode2": { - "pods": *resource.NewQuantity(20, resource.DecimalSI), + "pods": *resource.NewQuantity(20, resource.DecimalSI), + "nvidia.com/gpu": *resource.NewQuantity(30, resource.DecimalExponent), }, } } diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 5b9f6f08369b..474d6f236fee 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -142,7 +142,7 @@ func (acir *awsContainerInsightReceiver) initEKS(ctx context.Context, host compo hostName string, kubeletClient *kubeletutil.KubeletClient) error { k8sDecorator, err := stores.NewK8sDecorator(ctx, kubeletClient, acir.config.TagService, acir.config.PrefFullPodName, acir.config.AddFullPodNameMetricLabel, acir.config.AddContainerNameMetricLabel, - acir.config.EnableControlPlaneMetrics, acir.config.KubeConfigPath, hostName, + acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics, acir.config.KubeConfigPath, hostName, acir.config.RunOnSystemd, acir.settings.Logger) if err != nil { acir.settings.Logger.Warn("Unable to start K8s decorator", zap.Error(err)) @@ -177,7 +177,7 @@ func (acir *awsContainerInsightReceiver) initEKS(ctx context.Context, host compo acir.settings.Logger.Warn("Unable to elect leader node", zap.Error(err)) } - acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostInfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics) + acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostInfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics) if err != nil { acir.k8sapiserver = nil acir.settings.Logger.Warn("Unable to connect to api-server", zap.Error(err)) From bab8231b1559fff90feaa257bcafa6216c19d96d Mon Sep 17 00:00:00 2001 From: Kaushik Surya Date: Wed, 14 Aug 2024 10:55:12 -0400 Subject: [PATCH 2/3] Update test --- .../internal/stores/nodeinfo_test.go | 6 +++--- .../awscontainerinsightreceiver/internal/stores/podstore.go | 6 ++---- .../internal/stores/utils_test.go | 6 ++---- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go index 9261b49128a6..0f9c1cdd6449 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go @@ -103,9 +103,9 @@ func TestGetNodeStatusCapacityGPUs(t *testing.T) { assert.True(t, valid) assert.Equal(t, uint64(20), nodeStatusCapacityGPUs) - nodeInfo = newNodeInfo("testNodeNonExistent", &mockNodeInfoProvider{}, zap.NewNop()) - nodeStatusCapacityGPUs, valid = nodeInfo.getNodeStatusAllocatablePods() - assert.False(t, valid) + nodeInfo = newNodeInfo("testNode2", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusCapacityGPUs, valid = nodeInfo.getNodeStatusCapacityGPUs() + assert.True(t, valid) assert.Equal(t, uint64(0), nodeStatusCapacityGPUs) } diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index fd1abe03cd5d..0ef2002d4336 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -414,11 +414,9 @@ func (p *PodStore) decorateGPU(metric CIMetric, pod *corev1.Pod) { if p.includeEnhancedMetrics && p.enableAcceleratedComputeMetrics && metric.GetTag(ci.MetricType) == ci.TypePod && pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed { - if podGpuRequest, ok := getResourceSettingForPod(pod, 0, gpuKey, getRequestForContainer); ok { - metric.AddField(ci.MetricName(ci.TypePod, ci.GpuRequest), podGpuRequest) - } - if podGpuLimit, ok := getResourceSettingForPod(pod, 0, gpuKey, getLimitForContainer); ok { + podGpuRequest, _ := getResourceSettingForPod(pod, 0, gpuKey, getRequestForContainer) + metric.AddField(ci.MetricName(ci.TypePod, ci.GpuRequest), podGpuRequest) metric.AddField(ci.MetricName(ci.TypePod, ci.GpuLimit), podGpuLimit) var podGpuUsageTotal uint64 if pod.Status.Phase == corev1.PodRunning { // Set the GPU limit as the usage_total for running pods only diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go index 715aad47b771..f570415658ad 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go @@ -70,8 +70,7 @@ func (m *mockNodeInfoProvider) NodeToCapacityMap() map[string]v1.ResourceList { "nvidia.com/gpu": *resource.NewQuantity(20, resource.DecimalExponent), }, "testNode2": { - "pods": *resource.NewQuantity(10, resource.DecimalSI), - "nvidia.com/gpu": *resource.NewQuantity(30, resource.DecimalExponent), + "pods": *resource.NewQuantity(10, resource.DecimalSI), }, } } @@ -83,8 +82,7 @@ func (m *mockNodeInfoProvider) NodeToAllocatableMap() map[string]v1.ResourceList "nvidia.com/gpu": *resource.NewQuantity(20, resource.DecimalExponent), }, "testNode2": { - "pods": *resource.NewQuantity(20, resource.DecimalSI), - "nvidia.com/gpu": *resource.NewQuantity(30, resource.DecimalExponent), + "pods": *resource.NewQuantity(20, resource.DecimalSI), }, } } From 959860cc395adf0e9ffee6f73a715dceddb35e56 Mon Sep 17 00:00:00 2001 From: Kaushik Surya Date: Wed, 14 Aug 2024 11:03:02 -0400 Subject: [PATCH 3/3] Minor tweaks --- .../internal/stores/podstore.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index 0ef2002d4336..7872c75ac737 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -311,11 +311,12 @@ func (p *PodStore) refreshInternal(now time.Time, podList []corev1.Pod) { cpuRequest += tmpCPUReq tmpMemReq, _ := getResourceSettingForPod(&pod, p.nodeInfo.getMemCapacity(), memoryKey, getRequestForContainer) memRequest += tmpMemReq - if tmpGpuReq, ok := getResourceSettingForPod(&pod, 0, gpuKey, getRequestForContainer); ok { + if tmpGpuLimit, ok := getResourceSettingForPod(&pod, 0, gpuKey, getLimitForContainer); ok { + tmpGpuReq, _ := getResourceSettingForPod(&pod, 0, gpuKey, getRequestForContainer) gpuRequest += tmpGpuReq - } - if tmpGpuLimit, ok := getResourceSettingForPod(&pod, 0, gpuKey, getLimitForContainer); ok && pod.Status.Phase == corev1.PodRunning { - gpuUsageTotal += tmpGpuLimit + if pod.Status.Phase == corev1.PodRunning { + gpuUsageTotal += tmpGpuLimit + } } } if pod.Status.Phase == corev1.PodRunning {