diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index a5551aeb0d25..161e5f0244df 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -140,6 +140,10 @@ const ( EfaRxDropped = "rx_dropped" EfaTxBytes = "tx_bytes" + GpuLimit = "gpu_limit" + GpuTotal = "gpu_total" + GpuRequest = "gpu_request" + // Define the metric types TypeCluster = "Cluster" TypeClusterService = "ClusterService" @@ -319,5 +323,9 @@ func init() { EfaRxBytes: UnitBytesPerSec, EfaRxDropped: UnitCountPerSec, EfaTxBytes: UnitBytesPerSec, + + GpuLimit: UnitCount, + GpuTotal: UnitCount, + GpuRequest: UnitCount, } } diff --git a/internal/aws/containerinsight/utils.go b/internal/aws/containerinsight/utils.go index 7e87d77692bf..4c0b6fa15d0d 100644 --- a/internal/aws/containerinsight/utils.go +++ b/internal/aws/containerinsight/utils.go @@ -139,7 +139,7 @@ func getPrefixByMetricType(mType string) string { prefix = nodeNetPrefix case TypeNodeEFA: prefix = nodeEfaPrefix - case TypePod: + case TypePod, TypePodGPU: prefix = podPrefix case TypePodNet: prefix = podNetPrefix @@ -259,23 +259,24 @@ func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger for key, value := range fields { metric := RemovePrefix(metricType, key) unit := GetUnitForMetric(metric) + scopeMetric := ilms.AppendEmpty() switch t := value.(type) { case int: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case int32: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case int64: - intGauge(ilms.AppendEmpty(), key, unit, t, timestamp) + intGauge(scopeMetric, key, unit, t, timestamp) case uint: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case uint32: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case uint64: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case float32: - doubleGauge(ilms.AppendEmpty(), key, unit, float64(t), timestamp) + doubleGauge(scopeMetric, key, unit, float64(t), timestamp) case float64: - doubleGauge(ilms.AppendEmpty(), key, unit, t, timestamp) + doubleGauge(scopeMetric, key, unit, t, timestamp) default: valueType := fmt.Sprintf("%T", value) logger.Warn("Detected unexpected field", zap.String("key", key), zap.Any("value", value), zap.String("value type", valueType)) diff --git a/internal/aws/containerinsight/utils_test.go b/internal/aws/containerinsight/utils_test.go index 80b2a45be656..9ff78aa77a7b 100644 --- a/internal/aws/containerinsight/utils_test.go +++ b/internal/aws/containerinsight/utils_test.go @@ -214,7 +214,8 @@ func TestConvertToOTLPMetricsForInvalidMetrics(t *testing.T) { md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) rm := md.ResourceMetrics().At(0) ilms := rm.ScopeMetrics() - assert.Equal(t, 0, ilms.Len()) + assert.Equal(t, 1, ilms.Len()) + assert.Equal(t, 0, ilms.At(0).Metrics().Len()) } func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) { @@ -912,3 +913,37 @@ func TestConvertToOTLPMetricsForPodEfaMetrics(t *testing.T) { md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } + +func TestConvertToOTLPMetricsForAcceleratorCountMetrics(t *testing.T) { + var fields map[string]any + var expectedUnits map[string]string + var tags map[string]string + var md pmetric.Metrics + now := time.Now() + timestamp := strconv.FormatInt(now.UnixNano(), 10) + + fields = map[string]any{ + "pod_gpu_limit": int64(3), + "pod_gpu_total": int64(3), + "pod_gpu_request": int64(3), + } + expectedUnits = map[string]string{ + "pod_gpu_limit": UnitCount, + "pod_gpu_total": UnitCount, + "pod_gpu_request": UnitCount, + } + tags = map[string]string{ + "ClusterName": "eks-aoc", + "InstanceId": "i-01bf9fb097cbf3205", + "InstanceType": "t2.xlarge", + "Namespace": "amazon-cloudwatch", + "NodeName": "ip-192-168-12-170.ec2.internal", + "PodName": "cloudwatch-agent", + "ContainerName": "cloudwatch-agent", + "Type": "PodGPU", + "Version": "0", + "Timestamp": timestamp, + } + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + checkMetricsAreExpected(t, md, fields, tags, expectedUnits) +} diff --git a/internal/aws/k8s/k8sclient/node.go b/internal/aws/k8s/k8sclient/node.go index 8682417c590d..9b9fbb52166b 100644 --- a/internal/aws/k8s/k8sclient/node.go +++ b/internal/aws/k8s/k8sclient/node.go @@ -18,6 +18,11 @@ import ( "k8s.io/client-go/tools/cache" ) +const ( + instanceTypeLabelKey = "node.kubernetes.io/instance-type" + instanceTypeLabelKeyBeta = "beta.kubernetes.io/instance-type" +) + // This needs to be reviewed for newer versions of k8s. var failedNodeConditions = map[v1.NodeConditionType]bool{ v1.NodeMemoryPressure: true, @@ -27,6 +32,7 @@ var failedNodeConditions = map[v1.NodeConditionType]bool{ } type NodeClient interface { + NodeInfos() map[string]*NodeInfo // Get the number of failed nodes for current cluster ClusterFailedNodeCount() int // Get the number of nodes for current cluster @@ -72,6 +78,7 @@ type nodeClient struct { captureNodeLevelInfo bool mu sync.RWMutex + nodeInfos map[string]*NodeInfo clusterFailedNodeCount int clusterNodeCount int nodeToCapacityMap map[string]v1.ResourceList @@ -79,6 +86,15 @@ type nodeClient struct { nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus } +func (c *nodeClient) NodeInfos() map[string]*NodeInfo { + if c.store.GetResetRefreshStatus() { + c.refresh() + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.nodeInfos +} + func (c *nodeClient) ClusterFailedNodeCount() int { if c.store.GetResetRefreshStatus() { c.refresh() @@ -145,24 +161,26 @@ func (c *nodeClient) refresh() { nodeToAllocatableMap := make(map[string]v1.ResourceList) nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus) + nodeInfos := map[string]*NodeInfo{} for _, obj := range objsList { - node := obj.(*nodeInfo) + node := obj.(*NodeInfo) + nodeInfos[node.Name] = node if c.captureNodeLevelInfo { - nodeToCapacityMap[node.name] = node.capacity - nodeToAllocatableMap[node.name] = node.allocatable + nodeToCapacityMap[node.Name] = node.Capacity + nodeToAllocatableMap[node.Name] = node.Allocatable conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus) - for _, condition := range node.conditions { + for _, condition := range node.Conditions { conditionsMap[condition.Type] = condition.Status } - nodeToConditionsMap[node.name] = conditionsMap + nodeToConditionsMap[node.Name] = conditionsMap } clusterNodeCountNew++ failed := false Loop: - for _, condition := range node.conditions { + for _, condition := range node.Conditions { if _, ok := failedNodeConditions[condition.Type]; ok { // match the failedNodeConditions type we care about if condition.Status != v1.ConditionFalse { @@ -178,6 +196,7 @@ func (c *nodeClient) refresh() { } } + c.nodeInfos = nodeInfos c.clusterFailedNodeCount = clusterFailedNodeCountNew c.clusterNodeCount = clusterNodeCountNew c.nodeToCapacityMap = nodeToCapacityMap @@ -222,13 +241,23 @@ func transformFuncNode(obj any) (any, error) { if !ok { return nil, fmt.Errorf("input obj %v is not Node type", obj) } - info := new(nodeInfo) - info.name = node.Name - info.capacity = node.Status.Capacity - info.allocatable = node.Status.Allocatable - info.conditions = []*NodeCondition{} + info := new(NodeInfo) + info.Name = node.Name + info.Capacity = node.Status.Capacity + info.Allocatable = node.Status.Allocatable + info.Conditions = []*NodeCondition{} + info.ProviderId = node.Spec.ProviderID + if instanceType, ok := node.Labels[instanceTypeLabelKey]; ok { + info.InstanceType = instanceType + } else { + // fallback for compatibility with k8s versions older than v1.17 + // https://kubernetes.io/docs/reference/labels-annotations-taints/#beta-kubernetes-io-instance-type-deprecated + if instanceType, ok := node.Labels[instanceTypeLabelKeyBeta]; ok { + info.InstanceType = instanceType + } + } for _, condition := range node.Status.Conditions { - info.conditions = append(info.conditions, &NodeCondition{ + info.Conditions = append(info.Conditions, &NodeCondition{ Type: condition.Type, Status: condition.Status, }) diff --git a/internal/aws/k8s/k8sclient/node_info.go b/internal/aws/k8s/k8sclient/node_info.go index 6b1462adadcd..5fc419c552b9 100644 --- a/internal/aws/k8s/k8sclient/node_info.go +++ b/internal/aws/k8s/k8sclient/node_info.go @@ -7,11 +7,13 @@ import ( v1 "k8s.io/api/core/v1" ) -type nodeInfo struct { - name string - conditions []*NodeCondition - capacity v1.ResourceList - allocatable v1.ResourceList +type NodeInfo struct { + Name string + Conditions []*NodeCondition + Capacity v1.ResourceList + Allocatable v1.ResourceList + ProviderId string + InstanceType string } type NodeCondition struct { diff --git a/internal/aws/k8s/k8sclient/node_test.go b/internal/aws/k8s/k8sclient/node_test.go index c90394548165..a00cddb654e3 100644 --- a/internal/aws/k8s/k8sclient/node_test.go +++ b/internal/aws/k8s/k8sclient/node_test.go @@ -36,6 +36,7 @@ var nodeArray = []any{ "failure-domain.beta.kubernetes.io/region": "eu-west-1", "failure-domain.beta.kubernetes.io/zone": "eu-west-1c", "kubernetes.io/hostname": "ip-192-168-200-63.eu-west-1.compute.internal", + "node.kubernetes.io/instance-type": "t3.medium", }, Annotations: map[string]string{ "node.alpha.kubernetes.io/ttl": "0", @@ -112,6 +113,9 @@ var nodeArray = []any{ Architecture: "amd64", }, }, + Spec: v1.NodeSpec{ + ProviderID: "aws:///eu-west-1c/i-09087f37a14b9ded1", + }, }, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -132,6 +136,7 @@ var nodeArray = []any{ "kubernetes.io/hostname": "ip-192-168-76-61.eu-west-1.compute.internal", "kubernetes.io/arch": "amd64", "beta.kubernetes.io/instance-type": "t3.medium", + "node.kubernetes.io/instance-type": "t3.medium", }, Annotations: map[string]string{ "node.alpha.kubernetes.io/ttl": "0", @@ -208,6 +213,9 @@ var nodeArray = []any{ Architecture: "amd64", }, }, + Spec: v1.NodeSpec{ + ProviderID: "aws:///eu-west-1a/i-09087f37a14b9ded2", + }, }, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -304,6 +312,9 @@ var nodeArray = []any{ Architecture: "amd64", }, }, + Spec: v1.NodeSpec{ + ProviderID: "aws:///eu-west-1b/i-09087f37a14b9ded3", + }, }, } @@ -322,6 +333,95 @@ func TestNodeClient(t *testing.T) { "nodeToCapacityMap": map[string]v1.ResourceList{}, // Node level info is not captured by default "nodeToAllocatableMap": map[string]v1.ResourceList{}, // Node level info is not captured by default "nodeToConditionsMap": map[string]map[v1.NodeConditionType]v1.ConditionStatus{}, // Node level info is not captured by default + "nodeInfos": []*NodeInfo{ + { + Name: "ip-192-168-200-63.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionTrue, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1c/i-09087f37a14b9ded1", + InstanceType: "t3.medium", + }, + { + Name: "ip-192-168-76-61.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionTrue, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1a/i-09087f37a14b9ded2", + InstanceType: "t3.medium", + }, + { + Name: "ip-192-168-153-1.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionTrue, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionFalse, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1b/i-09087f37a14b9ded3", + InstanceType: "t3.medium", + }, + }, }, }, "CaptureNodeLevelInfo": { @@ -374,6 +474,95 @@ func TestNodeClient(t *testing.T) { "Ready": "False", }, }, + "nodeInfos": []*NodeInfo{ + { + Name: "ip-192-168-200-63.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionTrue, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1c/i-09087f37a14b9ded1", + InstanceType: "t3.medium", + }, + { + Name: "ip-192-168-76-61.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionTrue, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1a/i-09087f37a14b9ded2", + InstanceType: "t3.medium", + }, + { + Name: "ip-192-168-153-1.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionTrue, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionFalse, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1b/i-09087f37a14b9ded3", + InstanceType: "t3.medium", + }, + }, }, }, } @@ -388,6 +577,7 @@ func TestNodeClient(t *testing.T) { require.Equal(t, testCase.want["nodeToCapacityMap"], client.NodeToCapacityMap()) require.Equal(t, testCase.want["nodeToAllocatableMap"], client.NodeToAllocatableMap()) require.Equal(t, testCase.want["nodeToConditionsMap"], client.NodeToConditionsMap()) + require.EqualValues(t, testCase.want["nodeInfos"], client.NodeInfos()) client.shutdown() assert.True(t, client.stopped) diff --git a/internal/aws/k8s/k8sclient/pod.go b/internal/aws/k8s/k8sclient/pod.go index 626c9d77a193..4f82aac7c106 100644 --- a/internal/aws/k8s/k8sclient/pod.go +++ b/internal/aws/k8s/k8sclient/pod.go @@ -128,6 +128,15 @@ func transformFuncPod(obj any) (any, error) { info.OwnerReferences = pod.OwnerReferences info.Phase = pod.Status.Phase info.Conditions = pod.Status.Conditions + containerInfos := make([]*ContainerInfo, 0) + for _, container := range pod.Spec.Containers { + containerInfos = append(containerInfos, &ContainerInfo{ + Name: container.Name, + Resources: container.Resources, + }) + } + info.Containers = containerInfos + info.NodeName = pod.Spec.NodeName return info, nil } diff --git a/internal/aws/k8s/k8sclient/pod_info.go b/internal/aws/k8s/k8sclient/pod_info.go index 14e3eee13008..a9dfe6a3ce2c 100644 --- a/internal/aws/k8s/k8sclient/pod_info.go +++ b/internal/aws/k8s/k8sclient/pod_info.go @@ -16,4 +16,11 @@ type PodInfo struct { OwnerReferences []metaV1.OwnerReference Phase v1.PodPhase Conditions []v1.PodCondition + NodeName string + Containers []*ContainerInfo +} + +type ContainerInfo struct { + Name string + Resources v1.ResourceRequirements } diff --git a/internal/aws/k8s/k8sclient/pod_test.go b/internal/aws/k8s/k8sclient/pod_test.go index 9e0e8bdcf60a..b0af59f84844 100644 --- a/internal/aws/k8s/k8sclient/pod_test.go +++ b/internal/aws/k8s/k8sclient/pod_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" ) @@ -192,8 +193,8 @@ func TestTransformFuncPod(t *testing.T) { assert.Error(t, err) } -func TestPodClient_PodNameToPodMap(t *testing.T) { - skip(t, "Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11078") +func TestPodClient_PodInfos(t *testing.T) { + //skip(t, "Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11078") setOption := podSyncCheckerOption(&mockReflectorSyncChecker{}) samplePodArray := []any{ @@ -203,10 +204,29 @@ func TestPodClient_PodNameToPodMap(t *testing.T) { Name: "kube-proxy-csm88", Namespace: "kube-system", SelfLink: "/api/v1/namespaces/kube-system/pods/kube-proxy-csm88", + Labels: map[string]string{ + "key": "value", + }, }, Status: v1.PodStatus{ Phase: "Running", }, + Spec: v1.PodSpec{ + NodeName: "node-1", + Containers: []v1.Container{ + { + Name: "container-1", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + Requests: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + }, + }, + }, + }, }, } @@ -220,13 +240,29 @@ func TestPodClient_PodNameToPodMap(t *testing.T) { Name: "kube-proxy-csm88", Namespace: "kube-system", UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", - Labels: map[string]string{}, - Phase: v1.PodRunning, + Labels: map[string]string{ + "key": "value", + }, + Phase: v1.PodRunning, + NodeName: "node-1", + Containers: []*ContainerInfo{ + { + Name: "container-1", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + Requests: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + }, + }, + }, }, } - resultMap := client.PodInfos() - assert.Equal(t, expectedArray, resultMap) + results := client.PodInfos() + assert.Equal(t, expectedArray, results) client.shutdown() assert.True(t, client.stopped) } diff --git a/internal/aws/k8s/k8sutil/util.go b/internal/aws/k8s/k8sutil/util.go index 2264446eacfb..d6e115194d4b 100644 --- a/internal/aws/k8s/k8sutil/util.go +++ b/internal/aws/k8s/k8sutil/util.go @@ -5,6 +5,7 @@ package k8sutil // import "github.com/open-telemetry/opentelemetry-collector-con import ( "fmt" + "strings" ) // CreatePodKey concatenates namespace and podName to get a pod key @@ -22,3 +23,11 @@ func CreateContainerKey(namespace, podName, containerName string) string { } return fmt.Sprintf("namespace:%s,podName:%s,containerName:%s", namespace, podName, containerName) } + +// ParseInstanceIdFromProviderId parses EC2 instance id from node's provider id which has format of aws://// +func ParseInstanceIdFromProviderId(providerId string) string { + if providerId == "" || !strings.HasPrefix(providerId, "aws://") { + return "" + } + return providerId[strings.LastIndex(providerId, "/")+1:] +} diff --git a/internal/aws/k8s/k8sutil/util_test.go b/internal/aws/k8s/k8sutil/util_test.go index 16734f8513bc..045497194ead 100644 --- a/internal/aws/k8s/k8sutil/util_test.go +++ b/internal/aws/k8s/k8sutil/util_test.go @@ -22,3 +22,10 @@ func TestCreateContainerKey(t *testing.T) { assert.Equal(t, "", CreateContainerKey("default", "", "testContainer")) assert.Equal(t, "", CreateContainerKey("default", "testPod", "")) } + +func TestParseInstanceIdFromProviderId(t *testing.T) { + assert.Equal(t, "i-0b00e07ccd388f915", ParseInstanceIdFromProviderId("aws:///us-west-2b/i-0b00e07ccd388f915")) + assert.Equal(t, "i-0b00e07ccd388f915", ParseInstanceIdFromProviderId("aws:///us-east-1c/i-0b00e07ccd388f915")) + assert.Equal(t, "", ParseInstanceIdFromProviderId(":///us-east-1c/i-0b00e07ccd388f915")) + assert.Equal(t, "", ParseInstanceIdFromProviderId("")) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index cc9a17ef801d..c14d10ed5005 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -61,6 +62,7 @@ type K8sAPIServer struct { leaderElection *LeaderElection addFullPodNameMetricLabel bool includeEnhancedMetrics bool + enableAcceleratedMetrics bool } type clusterNameProvider interface { @@ -70,7 +72,7 @@ type clusterNameProvider interface { type Option func(*K8sAPIServer) // NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics -func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, options ...Option) (*K8sAPIServer, error) { +func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, enableAcceleratedMetrics bool, options ...Option) (*K8sAPIServer, error) { k := &K8sAPIServer{ logger: logger, @@ -78,6 +80,7 @@ func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection leaderElection: leaderElection, addFullPodNameMetricLabel: addFullPodNameMetricLabel, includeEnhancedMetrics: includeEnhancedMetrics, + enableAcceleratedMetrics: enableAcceleratedMetrics, } for _, opt := range options { @@ -125,6 +128,9 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { result = append(result, k.getStatefulSetMetrics(clusterName, timestampNs)...) result = append(result, k.getReplicaSetMetrics(clusterName, timestampNs)...) result = append(result, k.getPendingPodStatusMetrics(clusterName, timestampNs)...) + if k.enableAcceleratedMetrics { + result = append(result, k.getAcceleratorCountMetrics(clusterName, timestampNs)...) + } return result } @@ -351,7 +357,7 @@ func (k *K8sAPIServer) getPendingPodStatusMetrics(clusterName, timestampNs strin } attributes[ci.PodStatus] = string(v1.PodPending) - attributes["k8s.node.name"] = "pending" + attributes["k8s.node.name"] = pendingNodeName kubernetesBlob := map[string]any{} k.getKubernetesBlob(podInfo, kubernetesBlob, attributes) @@ -442,6 +448,95 @@ func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob } } +func (k *K8sAPIServer) getAcceleratorCountMetrics(clusterName, timestampNs string) []pmetric.Metrics { + var metrics []pmetric.Metrics + podsList := k.leaderElection.podClient.PodInfos() + nodeInfos := k.leaderElection.nodeClient.NodeInfos() + podKeyToServiceNamesMap := k.leaderElection.epClient.PodKeyToServiceNames() + for _, podInfo := range podsList { + // only care for pending and running pods + if podInfo.Phase != v1.PodPending && podInfo.Phase != v1.PodRunning { + continue + } + + fields := map[string]any{} + + var podLimit, podRequest, podTotal int64 + var gpuAllocated bool + for _, container := range podInfo.Containers { + // check if at least 1 container is using gpu to add count metrics for a pod + _, found := container.Resources.Limits[resourceSpecNvidiaGpuKey] + gpuAllocated = gpuAllocated || found + + if len(container.Resources.Limits) == 0 { + continue + } + podRequest += container.Resources.Requests.Name(resourceSpecNvidiaGpuKey, resource.DecimalExponent).Value() + limit := container.Resources.Limits.Name(resourceSpecNvidiaGpuKey, resource.DecimalExponent).Value() + // still counting pending pods to get total # of limit from spec + podLimit += limit + + // sum of running pods only. a pod will be stuck in pending state when there is less or no gpu available than limit value + // e.g. limit=2,available=1 - *_gpu_limit=2, *_gpu_request=2, *_gpu_total=0 + // request value is optional and must be equal to limit https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/ + if podInfo.Phase == v1.PodRunning { + podTotal += limit + } + } + // skip adding gpu count metrics when none of containers has gpu resource allocated + if !gpuAllocated { + continue + } + // add pod level count metrics here then metricstransformprocessor will duplicate to node/cluster level metrics + fields[ci.MetricName(ci.TypePod, ci.GpuLimit)] = podLimit + fields[ci.MetricName(ci.TypePod, ci.GpuRequest)] = podRequest + fields[ci.MetricName(ci.TypePod, ci.GpuTotal)] = podTotal + + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypePodGPU, + ci.Timestamp: timestampNs, + ci.AttributeK8sNamespace: podInfo.Namespace, + ci.Version: "0", + } + + podKey := k8sutil.CreatePodKey(podInfo.Namespace, podInfo.Name) + if serviceList, ok := podKeyToServiceNamesMap[podKey]; ok { + if len(serviceList) > 0 { + attributes[ci.TypeService] = serviceList[0] + } + } + + kubernetesBlob := map[string]any{} + k.getKubernetesBlob(podInfo, kubernetesBlob, attributes) + if podInfo.NodeName != "" { + // decorate with instance ID and type attributes which become dimensions for node_gpu_* metrics + attributes[ci.NodeNameKey] = podInfo.NodeName + kubernetesBlob["host"] = podInfo.NodeName + if nodeInfo, ok := nodeInfos[podInfo.NodeName]; ok { + attributes[ci.InstanceID] = k8sutil.ParseInstanceIdFromProviderId(nodeInfo.ProviderId) + attributes[ci.InstanceType] = nodeInfo.InstanceType + } + } else { + // fallback when node name is not available + attributes[ci.NodeNameKey] = pendingNodeName + kubernetesBlob["host"] = pendingNodeName + } + if len(kubernetesBlob) > 0 { + kubernetesInfo, err := json.Marshal(kubernetesBlob) + if err != nil { + k.logger.Warn("Error parsing kubernetes blob for pod metrics") + } else { + attributes[ci.AttributeKubernetes] = string(kubernetesInfo) + } + } + attributes[ci.SourcesKey] = "[\"apiserver\"]" + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + metrics = append(metrics, md) + } + return metrics +} + // Shutdown stops the k8sApiServer func (k *K8sAPIServer) Shutdown() error { if k.cancel != nil { diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index d4d27fbaea00..92149566450c 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -138,6 +139,12 @@ func (client *MockClient) PodInfos() []*k8sclient.PodInfo { return args.Get(0).([]*k8sclient.PodInfo) } +// k8sclient.NodeClient +func (client *MockClient) NodeInfos() map[string]*k8sclient.NodeInfo { + args := client.Called() + return args.Get(0).(map[string]*k8sclient.NodeInfo) +} + // k8sclient.NodeClient func (client *MockClient) ClusterFailedNodeCount() int { args := client.Called() @@ -224,7 +231,7 @@ func (m mockClusterNameProvider) GetClusterName() string { } func TestK8sAPIServer_New(t *testing.T) { - k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil, false, false) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil, false, false, false) assert.Nil(t, k8sAPIServer) assert.NotNil(t, err) } @@ -304,11 +311,43 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", Phase: v1.PodPending, }, + { + Name: "gpu-burn-6dcbd994fb-9fn8w", + Namespace: "amazon-cloudwatch", + NodeName: "ip-192-168-57-23.us-west-2.compute.internal", + UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", + Phase: v1.PodRunning, + Containers: []*k8sclient.ContainerInfo{ + { + Name: "container-1", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + resourceSpecNvidiaGpuKey: resource.MustParse("2"), + }, + }, + }, + }, + }, }) mockClient.On("PodKeyToServiceNames").Return(map[string][]string{ "namespace:kube-system,podName:coredns-7554568866-26jdf": {"kube-dns"}, "namespace:kube-system,podName:coredns-7554568866-shwn6": {"kube-dns"}, }) + mockClient.On("NodeInfos").Return(map[string]*k8sclient.NodeInfo{ + "ip-192-168-57-23.us-west-2.compute.internal": { + Name: "ip-192-168-57-23.us-west-2.compute.internal", + Conditions: []*k8sclient.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + Capacity: map[v1.ResourceName]resource.Quantity{}, + Allocatable: map[v1.ResourceName]resource.Quantity{}, + ProviderId: "aws:///us-west-2/i-abcdef123456789", + InstanceType: "g4dn-12xl", + }, + }) leaderElection := &LeaderElection{ k8sClient: &mockK8sClient{}, @@ -326,7 +365,7 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { t.Setenv("HOST_NAME", hostName) t.Setenv("K8S_NAMESPACE", "namespace") - k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection, true, true) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection, true, true, true) assert.NotNil(t, k8sAPIServer) assert.Nil(t, err) @@ -401,6 +440,13 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { assert.Equal(t, "kube-system", getStringAttrVal(metric, ci.AttributeK8sNamespace)) assert.Equal(t, "Pending", getStringAttrVal(metric, "pod_status")) assert.Equal(t, "Pod", getStringAttrVal(metric, ci.MetricType)) + case ci.TypePodGPU: + assertMetricValueEqual(t, metric, "pod_gpu_limit", int64(2)) + assertMetricValueEqual(t, metric, "pod_gpu_total", int64(2)) + assert.Equal(t, "amazon-cloudwatch", getStringAttrVal(metric, ci.AttributeK8sNamespace)) + assert.Equal(t, ci.TypePodGPU, getStringAttrVal(metric, ci.MetricType)) + assert.Equal(t, "i-abcdef123456789", getStringAttrVal(metric, ci.InstanceID)) + assert.Equal(t, "g4dn-12xl", getStringAttrVal(metric, ci.InstanceType)) default: assert.Fail(t, "Unexpected metric type: "+metricType) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go index a2efa312805d..c6a6293630fd 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go @@ -19,6 +19,8 @@ const ( splitRegexStr = "\\.|-" KubeProxy = "kube-proxy" cronJobAllowedString = "0123456789" + resourceSpecNvidiaGpuKey = "nvidia.com/gpu" + pendingNodeName = "pending" ) var ( diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index b34aaa82be48..79b463f40209 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -110,7 +110,7 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone return err } - acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics) + acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics) if err != nil { return err }