From 87b2dcd6d2c068519418a69b769e7df4f23c250f Mon Sep 17 00:00:00 2001 From: Hyunsoo Kim Date: Mon, 20 May 2024 11:57:54 -0400 Subject: [PATCH 1/3] add gpu count metrics including limit, request and total counts at pod, node and cluster levels --- internal/aws/containerinsight/const.go | 8 + internal/aws/containerinsight/utils.go | 31 +-- internal/aws/containerinsight/utils_test.go | 67 ++++-- internal/aws/k8s/k8sclient/node.go | 53 +++-- internal/aws/k8s/k8sclient/node_info.go | 12 +- internal/aws/k8s/k8sclient/node_test.go | 190 ++++++++++++++++++ internal/aws/k8s/k8sclient/pod.go | 9 + internal/aws/k8s/k8sclient/pod_info.go | 7 + internal/aws/k8s/k8sclient/pod_test.go | 48 ++++- internal/aws/k8s/k8sutil/util.go | 9 + internal/aws/k8s/k8sutil/util_test.go | 7 + .../internal/cadvisor/cadvisor_linux.go | 2 +- .../internal/efa/efaSysfs.go | 2 +- .../internal/k8sapiserver/k8sapiserver.go | 115 ++++++++++- .../k8sapiserver/k8sapiserver_test.go | 50 ++++- .../internal/k8sapiserver/utils.go | 1 + .../internal/k8swindows/k8swindows.go | 2 +- .../awscontainerinsightreceiver/receiver.go | 2 +- 18 files changed, 549 insertions(+), 66 deletions(-) diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index a5551aeb0d25..161e5f0244df 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -140,6 +140,10 @@ const ( EfaRxDropped = "rx_dropped" EfaTxBytes = "tx_bytes" + GpuLimit = "gpu_limit" + GpuTotal = "gpu_total" + GpuRequest = "gpu_request" + // Define the metric types TypeCluster = "Cluster" TypeClusterService = "ClusterService" @@ -319,5 +323,9 @@ func init() { EfaRxBytes: UnitBytesPerSec, EfaRxDropped: UnitCountPerSec, EfaTxBytes: UnitBytesPerSec, + + GpuLimit: UnitCount, + GpuTotal: UnitCount, + GpuRequest: UnitCount, } } diff --git a/internal/aws/containerinsight/utils.go b/internal/aws/containerinsight/utils.go index 7e87d77692bf..a66af9aabbc0 100644 --- a/internal/aws/containerinsight/utils.go +++ b/internal/aws/containerinsight/utils.go @@ -4,7 +4,6 @@ package containerinsight // import "github.com/open-telemetry/opentelemetry-coll import ( "fmt" - "log" "os" "runtime" "strconv" @@ -139,7 +138,7 @@ func getPrefixByMetricType(mType string) string { prefix = nodeNetPrefix case TypeNodeEFA: prefix = nodeEfaPrefix - case TypePod: + case TypePod, TypePodGPU: prefix = podPrefix case TypePodNet: prefix = podNetPrefix @@ -170,7 +169,7 @@ func getPrefixByMetricType(mType string) string { case TypeClusterReplicaSet: prefix = replicaSet default: - log.Printf("E! Unexpected MetricType: %s", mType) + prefix = "" } return prefix } @@ -236,7 +235,7 @@ func ConvertToFieldsAndTags(m pmetric.Metric, logger *zap.Logger) []FieldsAndTag } // ConvertToOTLPMetrics converts a field containing metric values and a tag containing the relevant labels to OTLP metrics -func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger *zap.Logger) pmetric.Metrics { +func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, addAttributesAtDatapoints bool, logger *zap.Logger) pmetric.Metrics { md := pmetric.NewMetrics() rms := md.ResourceMetrics() rm := rms.AppendEmpty() @@ -259,27 +258,35 @@ func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger for key, value := range fields { metric := RemovePrefix(metricType, key) unit := GetUnitForMetric(metric) + scopeMetric := ilms.AppendEmpty() switch t := value.(type) { case int: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case int32: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case int64: - intGauge(ilms.AppendEmpty(), key, unit, t, timestamp) + intGauge(scopeMetric, key, unit, t, timestamp) case uint: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case uint32: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case uint64: - intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp) + intGauge(scopeMetric, key, unit, int64(t), timestamp) case float32: - doubleGauge(ilms.AppendEmpty(), key, unit, float64(t), timestamp) + doubleGauge(scopeMetric, key, unit, float64(t), timestamp) case float64: - doubleGauge(ilms.AppendEmpty(), key, unit, t, timestamp) + doubleGauge(scopeMetric, key, unit, t, timestamp) default: valueType := fmt.Sprintf("%T", value) logger.Warn("Detected unexpected field", zap.String("key", key), zap.Any("value", value), zap.String("value type", valueType)) } + if addAttributesAtDatapoints { + if scopeMetric.Metrics().Len() == 0 || scopeMetric.Metrics().At(0).Gauge().DataPoints().Len() == 0 { + continue + } + dpAttrs := scopeMetric.Metrics().At(0).Gauge().DataPoints().At(0).Attributes() + resource.Attributes().CopyTo(dpAttrs) + } } return md diff --git a/internal/aws/containerinsight/utils_test.go b/internal/aws/containerinsight/utils_test.go index 80b2a45be656..ee653b0aaf7e 100644 --- a/internal/aws/containerinsight/utils_test.go +++ b/internal/aws/containerinsight/utils_test.go @@ -211,10 +211,11 @@ func TestConvertToOTLPMetricsForInvalidMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) rm := md.ResourceMetrics().At(0) ilms := rm.ScopeMetrics() - assert.Equal(t, 0, ilms.Len()) + assert.Equal(t, 1, ilms.Len()) + assert.Equal(t, 0, ilms.At(0).Metrics().Len()) } func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) { @@ -240,7 +241,7 @@ func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) { "Timestamp": timestamp, "Version": "0", } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) // test cluster namespace metrics @@ -256,7 +257,7 @@ func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) { "Timestamp": timestamp, "Version": "0", } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) // test cluster service metrics @@ -272,7 +273,7 @@ func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) { "Timestamp": timestamp, "Version": "0", } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -351,7 +352,7 @@ func TestConvertToOTLPMetricsForContainerMetrics(t *testing.T) { "container_status": "Running", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) // test container filesystem metrics @@ -382,7 +383,7 @@ func TestConvertToOTLPMetricsForContainerMetrics(t *testing.T) { "device": "/dev/xvda1", "fstype": "vfs", } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -477,7 +478,7 @@ func TestConvertToOTLPMetricsForNodeMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -526,7 +527,7 @@ func TestConvertToOTLPMetricsForNodeDiskIOMetrics(t *testing.T) { "device": "/dev/xvda", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -568,7 +569,7 @@ func TestConvertToOTLPMetricsForNodeFSMetrics(t *testing.T) { "fstype": "vfs", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -614,7 +615,7 @@ func TestConvertToOTLPMetricsForNodeNetMetrics(t *testing.T) { "interface": "eni7cce1b61ea4", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -658,7 +659,7 @@ func TestConvertToOTLPMetricsForNodeStatusMetrics(t *testing.T) { "interface": "eni7cce1b61ea4", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -773,7 +774,7 @@ func TestConvertToOTLPMetricsForPodMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -821,7 +822,7 @@ func TestConvertToOTLPMetricsForPodNetMetrics(t *testing.T) { "interface": "eth0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -869,7 +870,7 @@ func TestConvertToOTLPMetricsForPodContainerStatusMetrics(t *testing.T) { "interface": "eth0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -909,6 +910,40 @@ func TestConvertToOTLPMetricsForPodEfaMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + checkMetricsAreExpected(t, md, fields, tags, expectedUnits) +} + +func TestConvertToOTLPMetricsForAcceleratorCountMetrics(t *testing.T) { + var fields map[string]any + var expectedUnits map[string]string + var tags map[string]string + var md pmetric.Metrics + now := time.Now() + timestamp := strconv.FormatInt(now.UnixNano(), 10) + + fields = map[string]any{ + "pod_gpu_limit": int64(3), + "pod_gpu_total": int64(3), + "pod_gpu_request": int64(3), + } + expectedUnits = map[string]string{ + "pod_gpu_limit": UnitCount, + "pod_gpu_total": UnitCount, + "pod_gpu_request": UnitCount, + } + tags = map[string]string{ + "ClusterName": "eks-aoc", + "InstanceId": "i-01bf9fb097cbf3205", + "InstanceType": "t2.xlarge", + "Namespace": "amazon-cloudwatch", + "NodeName": "ip-192-168-12-170.ec2.internal", + "PodName": "cloudwatch-agent", + "ContainerName": "cloudwatch-agent", + "Type": "PodGPU", + "Version": "0", + "Timestamp": timestamp, + } + md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } diff --git a/internal/aws/k8s/k8sclient/node.go b/internal/aws/k8s/k8sclient/node.go index 8682417c590d..dd8a82e82149 100644 --- a/internal/aws/k8s/k8sclient/node.go +++ b/internal/aws/k8s/k8sclient/node.go @@ -18,6 +18,11 @@ import ( "k8s.io/client-go/tools/cache" ) +const ( + instanceTypeLabelKey = "node.kubernetes.io/instance-type" + instanceTypeLabelKeyBeta = "beta.kubernetes.io/instance-type" +) + // This needs to be reviewed for newer versions of k8s. var failedNodeConditions = map[v1.NodeConditionType]bool{ v1.NodeMemoryPressure: true, @@ -27,6 +32,7 @@ var failedNodeConditions = map[v1.NodeConditionType]bool{ } type NodeClient interface { + NodeInfos() []*NodeInfo // Get the number of failed nodes for current cluster ClusterFailedNodeCount() int // Get the number of nodes for current cluster @@ -72,6 +78,7 @@ type nodeClient struct { captureNodeLevelInfo bool mu sync.RWMutex + nodeInfos []*NodeInfo clusterFailedNodeCount int clusterNodeCount int nodeToCapacityMap map[string]v1.ResourceList @@ -79,6 +86,15 @@ type nodeClient struct { nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus } +func (c *nodeClient) NodeInfos() []*NodeInfo { + if c.store.GetResetRefreshStatus() { + c.refresh() + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.nodeInfos +} + func (c *nodeClient) ClusterFailedNodeCount() int { if c.store.GetResetRefreshStatus() { c.refresh() @@ -145,24 +161,26 @@ func (c *nodeClient) refresh() { nodeToAllocatableMap := make(map[string]v1.ResourceList) nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus) + nodeInfos := make([]*NodeInfo, 0) for _, obj := range objsList { - node := obj.(*nodeInfo) + node := obj.(*NodeInfo) + nodeInfos = append(nodeInfos, node) if c.captureNodeLevelInfo { - nodeToCapacityMap[node.name] = node.capacity - nodeToAllocatableMap[node.name] = node.allocatable + nodeToCapacityMap[node.Name] = node.Capacity + nodeToAllocatableMap[node.Name] = node.Allocatable conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus) - for _, condition := range node.conditions { + for _, condition := range node.Conditions { conditionsMap[condition.Type] = condition.Status } - nodeToConditionsMap[node.name] = conditionsMap + nodeToConditionsMap[node.Name] = conditionsMap } clusterNodeCountNew++ failed := false Loop: - for _, condition := range node.conditions { + for _, condition := range node.Conditions { if _, ok := failedNodeConditions[condition.Type]; ok { // match the failedNodeConditions type we care about if condition.Status != v1.ConditionFalse { @@ -178,6 +196,7 @@ func (c *nodeClient) refresh() { } } + c.nodeInfos = nodeInfos c.clusterFailedNodeCount = clusterFailedNodeCountNew c.clusterNodeCount = clusterNodeCountNew c.nodeToCapacityMap = nodeToCapacityMap @@ -222,13 +241,23 @@ func transformFuncNode(obj any) (any, error) { if !ok { return nil, fmt.Errorf("input obj %v is not Node type", obj) } - info := new(nodeInfo) - info.name = node.Name - info.capacity = node.Status.Capacity - info.allocatable = node.Status.Allocatable - info.conditions = []*NodeCondition{} + info := new(NodeInfo) + info.Name = node.Name + info.Capacity = node.Status.Capacity + info.Allocatable = node.Status.Allocatable + info.Conditions = []*NodeCondition{} + info.ProviderId = node.Spec.ProviderID + if instanceType, ok := node.Labels[instanceTypeLabelKey]; ok { + info.InstanceType = instanceType + } else { + // fallback for compatibility with k8s versions older than v1.17 + // https://kubernetes.io/docs/reference/labels-annotations-taints/#beta-kubernetes-io-instance-type-deprecated + if instanceType, ok := node.Labels[instanceTypeLabelKeyBeta]; ok { + info.InstanceType = instanceType + } + } for _, condition := range node.Status.Conditions { - info.conditions = append(info.conditions, &NodeCondition{ + info.Conditions = append(info.Conditions, &NodeCondition{ Type: condition.Type, Status: condition.Status, }) diff --git a/internal/aws/k8s/k8sclient/node_info.go b/internal/aws/k8s/k8sclient/node_info.go index 6b1462adadcd..5fc419c552b9 100644 --- a/internal/aws/k8s/k8sclient/node_info.go +++ b/internal/aws/k8s/k8sclient/node_info.go @@ -7,11 +7,13 @@ import ( v1 "k8s.io/api/core/v1" ) -type nodeInfo struct { - name string - conditions []*NodeCondition - capacity v1.ResourceList - allocatable v1.ResourceList +type NodeInfo struct { + Name string + Conditions []*NodeCondition + Capacity v1.ResourceList + Allocatable v1.ResourceList + ProviderId string + InstanceType string } type NodeCondition struct { diff --git a/internal/aws/k8s/k8sclient/node_test.go b/internal/aws/k8s/k8sclient/node_test.go index c90394548165..a00cddb654e3 100644 --- a/internal/aws/k8s/k8sclient/node_test.go +++ b/internal/aws/k8s/k8sclient/node_test.go @@ -36,6 +36,7 @@ var nodeArray = []any{ "failure-domain.beta.kubernetes.io/region": "eu-west-1", "failure-domain.beta.kubernetes.io/zone": "eu-west-1c", "kubernetes.io/hostname": "ip-192-168-200-63.eu-west-1.compute.internal", + "node.kubernetes.io/instance-type": "t3.medium", }, Annotations: map[string]string{ "node.alpha.kubernetes.io/ttl": "0", @@ -112,6 +113,9 @@ var nodeArray = []any{ Architecture: "amd64", }, }, + Spec: v1.NodeSpec{ + ProviderID: "aws:///eu-west-1c/i-09087f37a14b9ded1", + }, }, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -132,6 +136,7 @@ var nodeArray = []any{ "kubernetes.io/hostname": "ip-192-168-76-61.eu-west-1.compute.internal", "kubernetes.io/arch": "amd64", "beta.kubernetes.io/instance-type": "t3.medium", + "node.kubernetes.io/instance-type": "t3.medium", }, Annotations: map[string]string{ "node.alpha.kubernetes.io/ttl": "0", @@ -208,6 +213,9 @@ var nodeArray = []any{ Architecture: "amd64", }, }, + Spec: v1.NodeSpec{ + ProviderID: "aws:///eu-west-1a/i-09087f37a14b9ded2", + }, }, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -304,6 +312,9 @@ var nodeArray = []any{ Architecture: "amd64", }, }, + Spec: v1.NodeSpec{ + ProviderID: "aws:///eu-west-1b/i-09087f37a14b9ded3", + }, }, } @@ -322,6 +333,95 @@ func TestNodeClient(t *testing.T) { "nodeToCapacityMap": map[string]v1.ResourceList{}, // Node level info is not captured by default "nodeToAllocatableMap": map[string]v1.ResourceList{}, // Node level info is not captured by default "nodeToConditionsMap": map[string]map[v1.NodeConditionType]v1.ConditionStatus{}, // Node level info is not captured by default + "nodeInfos": []*NodeInfo{ + { + Name: "ip-192-168-200-63.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionTrue, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1c/i-09087f37a14b9ded1", + InstanceType: "t3.medium", + }, + { + Name: "ip-192-168-76-61.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionTrue, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1a/i-09087f37a14b9ded2", + InstanceType: "t3.medium", + }, + { + Name: "ip-192-168-153-1.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionTrue, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionFalse, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1b/i-09087f37a14b9ded3", + InstanceType: "t3.medium", + }, + }, }, }, "CaptureNodeLevelInfo": { @@ -374,6 +474,95 @@ func TestNodeClient(t *testing.T) { "Ready": "False", }, }, + "nodeInfos": []*NodeInfo{ + { + Name: "ip-192-168-200-63.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionTrue, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1c/i-09087f37a14b9ded1", + InstanceType: "t3.medium", + }, + { + Name: "ip-192-168-76-61.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionTrue, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1a/i-09087f37a14b9ded2", + InstanceType: "t3.medium", + }, + { + Name: "ip-192-168-153-1.eu-west-1.compute.internal", + Conditions: []*NodeCondition{ + { + Type: v1.NodeConditionType("MemoryPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("DiskPressure"), + Status: v1.ConditionTrue, + }, + { + Type: v1.NodeConditionType("PIDPressure"), + Status: v1.ConditionFalse, + }, + { + Type: v1.NodeConditionType("Ready"), + Status: v1.ConditionFalse, + }, + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + ProviderId: "aws:///eu-west-1b/i-09087f37a14b9ded3", + InstanceType: "t3.medium", + }, + }, }, }, } @@ -388,6 +577,7 @@ func TestNodeClient(t *testing.T) { require.Equal(t, testCase.want["nodeToCapacityMap"], client.NodeToCapacityMap()) require.Equal(t, testCase.want["nodeToAllocatableMap"], client.NodeToAllocatableMap()) require.Equal(t, testCase.want["nodeToConditionsMap"], client.NodeToConditionsMap()) + require.EqualValues(t, testCase.want["nodeInfos"], client.NodeInfos()) client.shutdown() assert.True(t, client.stopped) diff --git a/internal/aws/k8s/k8sclient/pod.go b/internal/aws/k8s/k8sclient/pod.go index 626c9d77a193..4f82aac7c106 100644 --- a/internal/aws/k8s/k8sclient/pod.go +++ b/internal/aws/k8s/k8sclient/pod.go @@ -128,6 +128,15 @@ func transformFuncPod(obj any) (any, error) { info.OwnerReferences = pod.OwnerReferences info.Phase = pod.Status.Phase info.Conditions = pod.Status.Conditions + containerInfos := make([]*ContainerInfo, 0) + for _, container := range pod.Spec.Containers { + containerInfos = append(containerInfos, &ContainerInfo{ + Name: container.Name, + Resources: container.Resources, + }) + } + info.Containers = containerInfos + info.NodeName = pod.Spec.NodeName return info, nil } diff --git a/internal/aws/k8s/k8sclient/pod_info.go b/internal/aws/k8s/k8sclient/pod_info.go index 14e3eee13008..a9dfe6a3ce2c 100644 --- a/internal/aws/k8s/k8sclient/pod_info.go +++ b/internal/aws/k8s/k8sclient/pod_info.go @@ -16,4 +16,11 @@ type PodInfo struct { OwnerReferences []metaV1.OwnerReference Phase v1.PodPhase Conditions []v1.PodCondition + NodeName string + Containers []*ContainerInfo +} + +type ContainerInfo struct { + Name string + Resources v1.ResourceRequirements } diff --git a/internal/aws/k8s/k8sclient/pod_test.go b/internal/aws/k8s/k8sclient/pod_test.go index 9e0e8bdcf60a..b0af59f84844 100644 --- a/internal/aws/k8s/k8sclient/pod_test.go +++ b/internal/aws/k8s/k8sclient/pod_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" ) @@ -192,8 +193,8 @@ func TestTransformFuncPod(t *testing.T) { assert.Error(t, err) } -func TestPodClient_PodNameToPodMap(t *testing.T) { - skip(t, "Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11078") +func TestPodClient_PodInfos(t *testing.T) { + //skip(t, "Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11078") setOption := podSyncCheckerOption(&mockReflectorSyncChecker{}) samplePodArray := []any{ @@ -203,10 +204,29 @@ func TestPodClient_PodNameToPodMap(t *testing.T) { Name: "kube-proxy-csm88", Namespace: "kube-system", SelfLink: "/api/v1/namespaces/kube-system/pods/kube-proxy-csm88", + Labels: map[string]string{ + "key": "value", + }, }, Status: v1.PodStatus{ Phase: "Running", }, + Spec: v1.PodSpec{ + NodeName: "node-1", + Containers: []v1.Container{ + { + Name: "container-1", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + Requests: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + }, + }, + }, + }, }, } @@ -220,13 +240,29 @@ func TestPodClient_PodNameToPodMap(t *testing.T) { Name: "kube-proxy-csm88", Namespace: "kube-system", UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", - Labels: map[string]string{}, - Phase: v1.PodRunning, + Labels: map[string]string{ + "key": "value", + }, + Phase: v1.PodRunning, + NodeName: "node-1", + Containers: []*ContainerInfo{ + { + Name: "container-1", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + Requests: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + }, + }, + }, }, } - resultMap := client.PodInfos() - assert.Equal(t, expectedArray, resultMap) + results := client.PodInfos() + assert.Equal(t, expectedArray, results) client.shutdown() assert.True(t, client.stopped) } diff --git a/internal/aws/k8s/k8sutil/util.go b/internal/aws/k8s/k8sutil/util.go index 2264446eacfb..d6e115194d4b 100644 --- a/internal/aws/k8s/k8sutil/util.go +++ b/internal/aws/k8s/k8sutil/util.go @@ -5,6 +5,7 @@ package k8sutil // import "github.com/open-telemetry/opentelemetry-collector-con import ( "fmt" + "strings" ) // CreatePodKey concatenates namespace and podName to get a pod key @@ -22,3 +23,11 @@ func CreateContainerKey(namespace, podName, containerName string) string { } return fmt.Sprintf("namespace:%s,podName:%s,containerName:%s", namespace, podName, containerName) } + +// ParseInstanceIdFromProviderId parses EC2 instance id from node's provider id which has format of aws://// +func ParseInstanceIdFromProviderId(providerId string) string { + if providerId == "" || !strings.HasPrefix(providerId, "aws://") { + return "" + } + return providerId[strings.LastIndex(providerId, "/")+1:] +} diff --git a/internal/aws/k8s/k8sutil/util_test.go b/internal/aws/k8s/k8sutil/util_test.go index 16734f8513bc..045497194ead 100644 --- a/internal/aws/k8s/k8sutil/util_test.go +++ b/internal/aws/k8s/k8sutil/util_test.go @@ -22,3 +22,10 @@ func TestCreateContainerKey(t *testing.T) { assert.Equal(t, "", CreateContainerKey("default", "", "testContainer")) assert.Equal(t, "", CreateContainerKey("default", "testPod", "")) } + +func TestParseInstanceIdFromProviderId(t *testing.T) { + assert.Equal(t, "i-0b00e07ccd388f915", ParseInstanceIdFromProviderId("aws:///us-west-2b/i-0b00e07ccd388f915")) + assert.Equal(t, "i-0b00e07ccd388f915", ParseInstanceIdFromProviderId("aws:///us-east-1c/i-0b00e07ccd388f915")) + assert.Equal(t, "", ParseInstanceIdFromProviderId(":///us-east-1c/i-0b00e07ccd388f915")) + assert.Equal(t, "", ParseInstanceIdFromProviderId("")) +} diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go index 16a1d0e66771..b1310cde9a8a 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go @@ -230,7 +230,7 @@ func (c *Cadvisor) GetMetrics() []pmetric.Metrics { if cadvisorMetric == nil { continue } - md := ci.ConvertToOTLPMetrics(cadvisorMetric.GetFields(), cadvisorMetric.GetTags(), c.logger) + md := ci.ConvertToOTLPMetrics(cadvisorMetric.GetFields(), cadvisorMetric.GetTags(), false, c.logger) result = append(result, md) } diff --git a/receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go b/receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go index 315563d5f5d3..5634329b5503 100644 --- a/receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go +++ b/receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go @@ -202,7 +202,7 @@ func (s *Scraper) GetMetrics() []pmetric.Metrics { continue } metric := s.decorator.Decorate(m) - result = append(result, ci.ConvertToOTLPMetrics(metric.GetFields(), metric.GetTags(), s.logger)) + result = append(result, ci.ConvertToOTLPMetrics(metric.GetFields(), metric.GetTags(), false, s.logger)) } } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index cc9a17ef801d..211a2dcfaab7 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -61,6 +62,7 @@ type K8sAPIServer struct { leaderElection *LeaderElection addFullPodNameMetricLabel bool includeEnhancedMetrics bool + enableAcceleratedMetrics bool } type clusterNameProvider interface { @@ -70,7 +72,7 @@ type clusterNameProvider interface { type Option func(*K8sAPIServer) // NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics -func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, options ...Option) (*K8sAPIServer, error) { +func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, enableAcceleratedMetrics bool, options ...Option) (*K8sAPIServer, error) { k := &K8sAPIServer{ logger: logger, @@ -78,6 +80,7 @@ func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection leaderElection: leaderElection, addFullPodNameMetricLabel: addFullPodNameMetricLabel, includeEnhancedMetrics: includeEnhancedMetrics, + enableAcceleratedMetrics: enableAcceleratedMetrics, } for _, opt := range options { @@ -125,6 +128,9 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { result = append(result, k.getStatefulSetMetrics(clusterName, timestampNs)...) result = append(result, k.getReplicaSetMetrics(clusterName, timestampNs)...) result = append(result, k.getPendingPodStatusMetrics(clusterName, timestampNs)...) + if k.enableAcceleratedMetrics { + result = append(result, k.getAcceleratorCountMetrics(clusterName, timestampNs)...) + } return result } @@ -152,7 +158,7 @@ func (k *K8sAPIServer) getClusterMetrics(clusterName, timestampNs string) pmetri attributes["NodeName"] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - return ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + return ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) } func (k *K8sAPIServer) getNamespaceMetrics(clusterName, timestampNs string) []pmetric.Metrics { @@ -173,7 +179,7 @@ func (k *K8sAPIServer) getNamespaceMetrics(clusterName, timestampNs string) []pm } attributes[ci.SourcesKey] = "[\"apiserver\"]" attributes[ci.AttributeKubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\"}", namespace) - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) metrics = append(metrics, md) } return metrics @@ -203,7 +209,7 @@ func (k *K8sAPIServer) getDeploymentMetrics(clusterName, timestampNs string) []p attributes[ci.SourcesKey] = "[\"apiserver\"]" // attributes[ci.AttributeKubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"deployment_name\":\"%s\"}", // deployment.Namespace, deployment.Name) - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) metrics = append(metrics, md) } return metrics @@ -233,7 +239,7 @@ func (k *K8sAPIServer) getDaemonSetMetrics(clusterName, timestampNs string) []pm attributes[ci.SourcesKey] = "[\"apiserver\"]" // attributes[ci.AttributeKubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"daemonset_name\":\"%s\"}", // daemonSet.Namespace, daemonSet.Name) - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) metrics = append(metrics, md) } return metrics @@ -259,7 +265,7 @@ func (k *K8sAPIServer) getServiceMetrics(clusterName, timestampNs string) []pmet attributes[ci.SourcesKey] = "[\"apiserver\"]" attributes[ci.AttributeKubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"service_name\":\"%s\"}", service.Namespace, service.ServiceName) - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) metrics = append(metrics, md) } return metrics @@ -286,7 +292,7 @@ func (k *K8sAPIServer) getStatefulSetMetrics(clusterName, timestampNs string) [] attributes[ci.NodeNameKey] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) metrics = append(metrics, md) } return metrics @@ -313,7 +319,7 @@ func (k *K8sAPIServer) getReplicaSetMetrics(clusterName, timestampNs string) []p attributes[ci.NodeNameKey] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) metrics = append(metrics, md) } return metrics @@ -367,7 +373,7 @@ func (k *K8sAPIServer) getPendingPodStatusMetrics(clusterName, timestampNs strin } } attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) metrics = append(metrics, md) } } @@ -442,6 +448,97 @@ func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob } } +func (k *K8sAPIServer) getAcceleratorCountMetrics(clusterName, timestampNs string) []pmetric.Metrics { + var metrics []pmetric.Metrics + podsList := k.leaderElection.podClient.PodInfos() + nodesList := k.leaderElection.nodeClient.NodeInfos() + podKeyToServiceNamesMap := k.leaderElection.epClient.PodKeyToServiceNames() + for _, podInfo := range podsList { + // only care for pending and running pods + if podInfo.Phase != v1.PodPending && podInfo.Phase != v1.PodRunning { + continue + } + + fields := map[string]any{} + + var podLimit, podRequest, podTotal int64 + var gpuAllocated bool + for _, container := range podInfo.Containers { + // check if at least 1 container is using gpu to add count metrics for a pod + _, found := container.Resources.Limits[resourceSpecNvidiaGpuKey] + gpuAllocated = gpuAllocated || found + + if len(container.Resources.Limits) == 0 { + continue + } + podRequest += container.Resources.Requests.Name(resourceSpecNvidiaGpuKey, resource.DecimalExponent).Value() + limit := container.Resources.Limits.Name(resourceSpecNvidiaGpuKey, resource.DecimalExponent).Value() + // still counting pending pods to get total # of limit from spec + podLimit += limit + + // sum of running pods only. a pod will be stuck in pending state when there is less or no gpu available than limit value + // e.g. limit=2,available=1 - *_gpu_limit=2, *_gpu_request=2, *_gpu_total=0 + // request value is optional and must be equal to limit https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/ + if podInfo.Phase == v1.PodRunning { + podTotal += limit + } + } + // skip adding gpu count metrics when none of containers has gpu resource allocated + if !gpuAllocated { + continue + } + // add pod level count metrics here then metricstransformprocessor will duplicate to node/cluster level metrics + fields[ci.MetricName(ci.TypePod, "gpu_limit")] = podLimit + fields[ci.MetricName(ci.TypePod, "gpu_request")] = podRequest + fields[ci.MetricName(ci.TypePod, "gpu_total")] = podTotal + + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypePodGPU, + ci.Timestamp: timestampNs, + ci.AttributeK8sNamespace: podInfo.Namespace, + ci.Version: "0", + } + + podKey := k8sutil.CreatePodKey(podInfo.Namespace, podInfo.Name) + if serviceList, ok := podKeyToServiceNamesMap[podKey]; ok { + if len(serviceList) > 0 { + attributes[ci.TypeService] = serviceList[0] + } + } + + kubernetesBlob := map[string]any{} + k.getKubernetesBlob(podInfo, kubernetesBlob, attributes) + if podInfo.NodeName != "" { + // decorate with instance ID and type attributes which become dimensions for node_gpu_* metrics + attributes[ci.NodeNameKey] = podInfo.NodeName + kubernetesBlob["host"] = podInfo.NodeName + for _, node := range nodesList { + if node.Name == podInfo.NodeName { + attributes[ci.InstanceID] = k8sutil.ParseInstanceIdFromProviderId(node.ProviderId) + attributes[ci.InstanceType] = node.InstanceType + } + } + } else { + // fallback when node name is not available + attributes[ci.NodeNameKey] = k.nodeName + kubernetesBlob["host"] = k.nodeName + } + if len(kubernetesBlob) > 0 { + kubernetesInfo, err := json.Marshal(kubernetesBlob) + if err != nil { + k.logger.Warn("Error parsing kubernetes blob for pod metrics") + } else { + attributes[ci.AttributeKubernetes] = string(kubernetesInfo) + } + } + attributes[ci.SourcesKey] = "[\"apiserver\"]" + md := ci.ConvertToOTLPMetrics(fields, attributes, true, k.logger) + metrics = append(metrics, md) + } + return metrics +} + // Shutdown stops the k8sApiServer func (k *K8sAPIServer) Shutdown() error { if k.cancel != nil { diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index d4d27fbaea00..aca2ad117b10 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -138,6 +139,12 @@ func (client *MockClient) PodInfos() []*k8sclient.PodInfo { return args.Get(0).([]*k8sclient.PodInfo) } +// k8sclient.NodeClient +func (client *MockClient) NodeInfos() []*k8sclient.NodeInfo { + args := client.Called() + return args.Get(0).([]*k8sclient.NodeInfo) +} + // k8sclient.NodeClient func (client *MockClient) ClusterFailedNodeCount() int { args := client.Called() @@ -224,7 +231,7 @@ func (m mockClusterNameProvider) GetClusterName() string { } func TestK8sAPIServer_New(t *testing.T) { - k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil, false, false) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil, false, false, false) assert.Nil(t, k8sAPIServer) assert.NotNil(t, err) } @@ -304,11 +311,43 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", Phase: v1.PodPending, }, + { + Name: "gpu-burn-6dcbd994fb-9fn8w", + Namespace: "amazon-cloudwatch", + NodeName: "ip-192-168-57-23.us-west-2.compute.internal", + UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1", + Phase: v1.PodRunning, + Containers: []*k8sclient.ContainerInfo{ + { + Name: "container-1", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + resourceSpecNvidiaGpuKey: resource.MustParse("2"), + }, + }, + }, + }, + }, }) mockClient.On("PodKeyToServiceNames").Return(map[string][]string{ "namespace:kube-system,podName:coredns-7554568866-26jdf": {"kube-dns"}, "namespace:kube-system,podName:coredns-7554568866-shwn6": {"kube-dns"}, }) + mockClient.On("NodeInfos").Return([]*k8sclient.NodeInfo{ + { + Name: "ip-192-168-57-23.us-west-2.compute.internal", + Conditions: []*k8sclient.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + Capacity: map[v1.ResourceName]resource.Quantity{}, + Allocatable: map[v1.ResourceName]resource.Quantity{}, + ProviderId: "aws:///us-west-2/i-abcdef123456789", + InstanceType: "g4dn-12xl", + }, + }) leaderElection := &LeaderElection{ k8sClient: &mockK8sClient{}, @@ -326,7 +365,7 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { t.Setenv("HOST_NAME", hostName) t.Setenv("K8S_NAMESPACE", "namespace") - k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection, true, true) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection, true, true, true) assert.NotNil(t, k8sAPIServer) assert.Nil(t, err) @@ -401,6 +440,13 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { assert.Equal(t, "kube-system", getStringAttrVal(metric, ci.AttributeK8sNamespace)) assert.Equal(t, "Pending", getStringAttrVal(metric, "pod_status")) assert.Equal(t, "Pod", getStringAttrVal(metric, ci.MetricType)) + case ci.TypePodGPU: + assertMetricValueEqual(t, metric, "pod_gpu_limit", int64(2)) + assertMetricValueEqual(t, metric, "pod_gpu_total", int64(2)) + assert.Equal(t, "amazon-cloudwatch", getStringAttrVal(metric, ci.AttributeK8sNamespace)) + assert.Equal(t, ci.TypePodGPU, getStringAttrVal(metric, ci.MetricType)) + assert.Equal(t, "i-abcdef123456789", getStringAttrVal(metric, ci.InstanceID)) + assert.Equal(t, "g4dn-12xl", getStringAttrVal(metric, ci.InstanceType)) default: assert.Fail(t, "Unexpected metric type: "+metricType) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go index a2efa312805d..cac2c6e9acc6 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go @@ -19,6 +19,7 @@ const ( splitRegexStr = "\\.|-" KubeProxy = "kube-proxy" cronJobAllowedString = "0123456789" + resourceSpecNvidiaGpuKey = "nvidia.com/gpu" ) var ( diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go index 99d93cd54782..435080d698ba 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go @@ -90,7 +90,7 @@ func (k *K8sWindows) GetMetrics() []pmetric.Metrics { metrics = cExtractor.MergeMetrics(metrics) metrics = k.decorateMetrics(metrics) for _, ciMetric := range metrics { - md := ci.ConvertToOTLPMetrics(ciMetric.GetFields(), ciMetric.GetTags(), k.logger) + md := ci.ConvertToOTLPMetrics(ciMetric.GetFields(), ciMetric.GetTags(), false, k.logger) result = append(result, md) } diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index b34aaa82be48..79b463f40209 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -110,7 +110,7 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone return err } - acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics) + acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics) if err != nil { return err } From 1ed1342c54010eef771df3b1f0e292be0ddb639e Mon Sep 17 00:00:00 2001 From: Hyunsoo Kim Date: Fri, 24 May 2024 11:30:18 -0400 Subject: [PATCH 2/3] revert removed log while getting metric prefix remove bool flag for ConvertToOTLPMetrics --- internal/aws/containerinsight/utils.go | 12 ++---- internal/aws/containerinsight/utils_test.go | 32 +++++++-------- internal/aws/k8s/k8sclient/node.go | 10 ++--- .../internal/cadvisor/cadvisor_linux.go | 2 +- .../internal/efa/efaSysfs.go | 2 +- .../internal/k8sapiserver/k8sapiserver.go | 41 +++++++++---------- .../k8sapiserver/k8sapiserver_test.go | 8 ++-- .../internal/k8sapiserver/utils.go | 28 +++++++++++++ .../internal/k8sapiserver/utils_test.go | 35 ++++++++++++++++ .../internal/k8swindows/k8swindows.go | 2 +- 10 files changed, 114 insertions(+), 58 deletions(-) diff --git a/internal/aws/containerinsight/utils.go b/internal/aws/containerinsight/utils.go index a66af9aabbc0..4c0b6fa15d0d 100644 --- a/internal/aws/containerinsight/utils.go +++ b/internal/aws/containerinsight/utils.go @@ -4,6 +4,7 @@ package containerinsight // import "github.com/open-telemetry/opentelemetry-coll import ( "fmt" + "log" "os" "runtime" "strconv" @@ -169,7 +170,7 @@ func getPrefixByMetricType(mType string) string { case TypeClusterReplicaSet: prefix = replicaSet default: - prefix = "" + log.Printf("E! Unexpected MetricType: %s", mType) } return prefix } @@ -235,7 +236,7 @@ func ConvertToFieldsAndTags(m pmetric.Metric, logger *zap.Logger) []FieldsAndTag } // ConvertToOTLPMetrics converts a field containing metric values and a tag containing the relevant labels to OTLP metrics -func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, addAttributesAtDatapoints bool, logger *zap.Logger) pmetric.Metrics { +func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger *zap.Logger) pmetric.Metrics { md := pmetric.NewMetrics() rms := md.ResourceMetrics() rm := rms.AppendEmpty() @@ -280,13 +281,6 @@ func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, addAttr valueType := fmt.Sprintf("%T", value) logger.Warn("Detected unexpected field", zap.String("key", key), zap.Any("value", value), zap.String("value type", valueType)) } - if addAttributesAtDatapoints { - if scopeMetric.Metrics().Len() == 0 || scopeMetric.Metrics().At(0).Gauge().DataPoints().Len() == 0 { - continue - } - dpAttrs := scopeMetric.Metrics().At(0).Gauge().DataPoints().At(0).Attributes() - resource.Attributes().CopyTo(dpAttrs) - } } return md diff --git a/internal/aws/containerinsight/utils_test.go b/internal/aws/containerinsight/utils_test.go index ee653b0aaf7e..9ff78aa77a7b 100644 --- a/internal/aws/containerinsight/utils_test.go +++ b/internal/aws/containerinsight/utils_test.go @@ -211,7 +211,7 @@ func TestConvertToOTLPMetricsForInvalidMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) rm := md.ResourceMetrics().At(0) ilms := rm.ScopeMetrics() assert.Equal(t, 1, ilms.Len()) @@ -241,7 +241,7 @@ func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) { "Timestamp": timestamp, "Version": "0", } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) // test cluster namespace metrics @@ -257,7 +257,7 @@ func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) { "Timestamp": timestamp, "Version": "0", } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) // test cluster service metrics @@ -273,7 +273,7 @@ func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) { "Timestamp": timestamp, "Version": "0", } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -352,7 +352,7 @@ func TestConvertToOTLPMetricsForContainerMetrics(t *testing.T) { "container_status": "Running", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) // test container filesystem metrics @@ -383,7 +383,7 @@ func TestConvertToOTLPMetricsForContainerMetrics(t *testing.T) { "device": "/dev/xvda1", "fstype": "vfs", } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -478,7 +478,7 @@ func TestConvertToOTLPMetricsForNodeMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -527,7 +527,7 @@ func TestConvertToOTLPMetricsForNodeDiskIOMetrics(t *testing.T) { "device": "/dev/xvda", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -569,7 +569,7 @@ func TestConvertToOTLPMetricsForNodeFSMetrics(t *testing.T) { "fstype": "vfs", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -615,7 +615,7 @@ func TestConvertToOTLPMetricsForNodeNetMetrics(t *testing.T) { "interface": "eni7cce1b61ea4", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -659,7 +659,7 @@ func TestConvertToOTLPMetricsForNodeStatusMetrics(t *testing.T) { "interface": "eni7cce1b61ea4", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -774,7 +774,7 @@ func TestConvertToOTLPMetricsForPodMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -822,7 +822,7 @@ func TestConvertToOTLPMetricsForPodNetMetrics(t *testing.T) { "interface": "eth0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -870,7 +870,7 @@ func TestConvertToOTLPMetricsForPodContainerStatusMetrics(t *testing.T) { "interface": "eth0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -910,7 +910,7 @@ func TestConvertToOTLPMetricsForPodEfaMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } @@ -944,6 +944,6 @@ func TestConvertToOTLPMetricsForAcceleratorCountMetrics(t *testing.T) { "Version": "0", "Timestamp": timestamp, } - md = ConvertToOTLPMetrics(fields, tags, false, zap.NewNop()) + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } diff --git a/internal/aws/k8s/k8sclient/node.go b/internal/aws/k8s/k8sclient/node.go index dd8a82e82149..9b9fbb52166b 100644 --- a/internal/aws/k8s/k8sclient/node.go +++ b/internal/aws/k8s/k8sclient/node.go @@ -32,7 +32,7 @@ var failedNodeConditions = map[v1.NodeConditionType]bool{ } type NodeClient interface { - NodeInfos() []*NodeInfo + NodeInfos() map[string]*NodeInfo // Get the number of failed nodes for current cluster ClusterFailedNodeCount() int // Get the number of nodes for current cluster @@ -78,7 +78,7 @@ type nodeClient struct { captureNodeLevelInfo bool mu sync.RWMutex - nodeInfos []*NodeInfo + nodeInfos map[string]*NodeInfo clusterFailedNodeCount int clusterNodeCount int nodeToCapacityMap map[string]v1.ResourceList @@ -86,7 +86,7 @@ type nodeClient struct { nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus } -func (c *nodeClient) NodeInfos() []*NodeInfo { +func (c *nodeClient) NodeInfos() map[string]*NodeInfo { if c.store.GetResetRefreshStatus() { c.refresh() } @@ -161,10 +161,10 @@ func (c *nodeClient) refresh() { nodeToAllocatableMap := make(map[string]v1.ResourceList) nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus) - nodeInfos := make([]*NodeInfo, 0) + nodeInfos := map[string]*NodeInfo{} for _, obj := range objsList { node := obj.(*NodeInfo) - nodeInfos = append(nodeInfos, node) + nodeInfos[node.Name] = node if c.captureNodeLevelInfo { nodeToCapacityMap[node.Name] = node.Capacity diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go index b1310cde9a8a..16a1d0e66771 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go @@ -230,7 +230,7 @@ func (c *Cadvisor) GetMetrics() []pmetric.Metrics { if cadvisorMetric == nil { continue } - md := ci.ConvertToOTLPMetrics(cadvisorMetric.GetFields(), cadvisorMetric.GetTags(), false, c.logger) + md := ci.ConvertToOTLPMetrics(cadvisorMetric.GetFields(), cadvisorMetric.GetTags(), c.logger) result = append(result, md) } diff --git a/receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go b/receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go index 5634329b5503..315563d5f5d3 100644 --- a/receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go +++ b/receiver/awscontainerinsightreceiver/internal/efa/efaSysfs.go @@ -202,7 +202,7 @@ func (s *Scraper) GetMetrics() []pmetric.Metrics { continue } metric := s.decorator.Decorate(m) - result = append(result, ci.ConvertToOTLPMetrics(metric.GetFields(), metric.GetTags(), false, s.logger)) + result = append(result, ci.ConvertToOTLPMetrics(metric.GetFields(), metric.GetTags(), s.logger)) } } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index 211a2dcfaab7..3045adadab35 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -158,7 +158,7 @@ func (k *K8sAPIServer) getClusterMetrics(clusterName, timestampNs string) pmetri attributes["NodeName"] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - return ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) + return ci.ConvertToOTLPMetrics(fields, attributes, k.logger) } func (k *K8sAPIServer) getNamespaceMetrics(clusterName, timestampNs string) []pmetric.Metrics { @@ -179,7 +179,7 @@ func (k *K8sAPIServer) getNamespaceMetrics(clusterName, timestampNs string) []pm } attributes[ci.SourcesKey] = "[\"apiserver\"]" attributes[ci.AttributeKubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\"}", namespace) - md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) } return metrics @@ -209,7 +209,7 @@ func (k *K8sAPIServer) getDeploymentMetrics(clusterName, timestampNs string) []p attributes[ci.SourcesKey] = "[\"apiserver\"]" // attributes[ci.AttributeKubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"deployment_name\":\"%s\"}", // deployment.Namespace, deployment.Name) - md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) } return metrics @@ -239,7 +239,7 @@ func (k *K8sAPIServer) getDaemonSetMetrics(clusterName, timestampNs string) []pm attributes[ci.SourcesKey] = "[\"apiserver\"]" // attributes[ci.AttributeKubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"daemonset_name\":\"%s\"}", // daemonSet.Namespace, daemonSet.Name) - md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) } return metrics @@ -265,7 +265,7 @@ func (k *K8sAPIServer) getServiceMetrics(clusterName, timestampNs string) []pmet attributes[ci.SourcesKey] = "[\"apiserver\"]" attributes[ci.AttributeKubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"service_name\":\"%s\"}", service.Namespace, service.ServiceName) - md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) } return metrics @@ -292,7 +292,7 @@ func (k *K8sAPIServer) getStatefulSetMetrics(clusterName, timestampNs string) [] attributes[ci.NodeNameKey] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) } return metrics @@ -319,7 +319,7 @@ func (k *K8sAPIServer) getReplicaSetMetrics(clusterName, timestampNs string) []p attributes[ci.NodeNameKey] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) } return metrics @@ -357,7 +357,7 @@ func (k *K8sAPIServer) getPendingPodStatusMetrics(clusterName, timestampNs strin } attributes[ci.PodStatus] = string(v1.PodPending) - attributes["k8s.node.name"] = "pending" + attributes["k8s.node.name"] = pendingNodeName kubernetesBlob := map[string]any{} k.getKubernetesBlob(podInfo, kubernetesBlob, attributes) @@ -373,7 +373,7 @@ func (k *K8sAPIServer) getPendingPodStatusMetrics(clusterName, timestampNs strin } } attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, false, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) } } @@ -451,7 +451,7 @@ func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob func (k *K8sAPIServer) getAcceleratorCountMetrics(clusterName, timestampNs string) []pmetric.Metrics { var metrics []pmetric.Metrics podsList := k.leaderElection.podClient.PodInfos() - nodesList := k.leaderElection.nodeClient.NodeInfos() + nodeInfos := k.leaderElection.nodeClient.NodeInfos() podKeyToServiceNamesMap := k.leaderElection.epClient.PodKeyToServiceNames() for _, podInfo := range podsList { // only care for pending and running pods @@ -488,9 +488,9 @@ func (k *K8sAPIServer) getAcceleratorCountMetrics(clusterName, timestampNs strin continue } // add pod level count metrics here then metricstransformprocessor will duplicate to node/cluster level metrics - fields[ci.MetricName(ci.TypePod, "gpu_limit")] = podLimit - fields[ci.MetricName(ci.TypePod, "gpu_request")] = podRequest - fields[ci.MetricName(ci.TypePod, "gpu_total")] = podTotal + fields[ci.MetricName(ci.TypePod, ci.GpuLimit)] = podLimit + fields[ci.MetricName(ci.TypePod, ci.GpuRequest)] = podRequest + fields[ci.MetricName(ci.TypePod, ci.GpuTotal)] = podTotal attributes := map[string]string{ ci.ClusterNameKey: clusterName, @@ -513,16 +513,14 @@ func (k *K8sAPIServer) getAcceleratorCountMetrics(clusterName, timestampNs strin // decorate with instance ID and type attributes which become dimensions for node_gpu_* metrics attributes[ci.NodeNameKey] = podInfo.NodeName kubernetesBlob["host"] = podInfo.NodeName - for _, node := range nodesList { - if node.Name == podInfo.NodeName { - attributes[ci.InstanceID] = k8sutil.ParseInstanceIdFromProviderId(node.ProviderId) - attributes[ci.InstanceType] = node.InstanceType - } + if nodeInfo, ok := nodeInfos[podInfo.NodeName]; ok { + attributes[ci.InstanceID] = k8sutil.ParseInstanceIdFromProviderId(nodeInfo.ProviderId) + attributes[ci.InstanceType] = nodeInfo.InstanceType } } else { // fallback when node name is not available - attributes[ci.NodeNameKey] = k.nodeName - kubernetesBlob["host"] = k.nodeName + attributes[ci.NodeNameKey] = pendingNodeName + kubernetesBlob["host"] = pendingNodeName } if len(kubernetesBlob) > 0 { kubernetesInfo, err := json.Marshal(kubernetesBlob) @@ -533,7 +531,8 @@ func (k *K8sAPIServer) getAcceleratorCountMetrics(clusterName, timestampNs strin } } attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, true, k.logger) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + copyResourceAttributes(md) metrics = append(metrics, md) } return metrics diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index aca2ad117b10..92149566450c 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -140,9 +140,9 @@ func (client *MockClient) PodInfos() []*k8sclient.PodInfo { } // k8sclient.NodeClient -func (client *MockClient) NodeInfos() []*k8sclient.NodeInfo { +func (client *MockClient) NodeInfos() map[string]*k8sclient.NodeInfo { args := client.Called() - return args.Get(0).([]*k8sclient.NodeInfo) + return args.Get(0).(map[string]*k8sclient.NodeInfo) } // k8sclient.NodeClient @@ -333,8 +333,8 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { "namespace:kube-system,podName:coredns-7554568866-26jdf": {"kube-dns"}, "namespace:kube-system,podName:coredns-7554568866-shwn6": {"kube-dns"}, }) - mockClient.On("NodeInfos").Return([]*k8sclient.NodeInfo{ - { + mockClient.On("NodeInfos").Return(map[string]*k8sclient.NodeInfo{ + "ip-192-168-57-23.us-west-2.compute.internal": { Name: "ip-192-168-57-23.us-west-2.compute.internal", Conditions: []*k8sclient.NodeCondition{ { diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go index cac2c6e9acc6..956b721855c7 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go @@ -7,6 +7,7 @@ import ( "regexp" "strings" + "go.opentelemetry.io/collector/pdata/pmetric" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" @@ -20,6 +21,7 @@ const ( KubeProxy = "kube-proxy" cronJobAllowedString = "0123456789" resourceSpecNvidiaGpuKey = "nvidia.com/gpu" + pendingNodeName = "pending" ) var ( @@ -128,3 +130,29 @@ func parseDeploymentFromReplicaSet(name string) string { } return name[:lastDash] } + +// ConvertToOTLPMetrics creates otel metrics by appending scope metrics with 1 datapoint each for corresponding field items +// this function loops through metric object to copy down resource attributes to datapoints +func copyResourceAttributes(metrics pmetric.Metrics) { + rms := metrics.ResourceMetrics() + if rms.Len() == 0 { + return + } + rm := rms.At(0) + resource := rm.Resource() + for si := range rm.ScopeMetrics().Len() { + scopeMetric := rm.ScopeMetrics().At(si) + if scopeMetric.Metrics().Len() == 0 { + return + } + for mi := range scopeMetric.Metrics().Len() { + dps := scopeMetric.Metrics().At(mi).Gauge().DataPoints() + if dps.Len() == 0 { + return + } + for di := range dps.Len() { + resource.Attributes().CopyTo(dps.At(di).Attributes()) + } + } + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go index 6e7e0fd28fc4..8c22e294f7d0 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pmetric" v1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" @@ -61,3 +62,37 @@ func TestPodStore_addPodConditionMetrics(t *testing.T) { } assert.Equal(t, expectedFieldsArray, fields) } + +func TestUtils_copyResourceAttributes(t *testing.T) { + ms := pmetric.NewMetrics() + rm := ms.ResourceMetrics().AppendEmpty() + rattrs := rm.Resource().Attributes() + rattrs.PutStr("key1", "value1") + rattrs.PutStr("key2", "value2") + + // 1 datapoint with no existing attributes + ilms := rm.ScopeMetrics().AppendEmpty() + oneDp := ilms.Metrics().AppendEmpty() + oneDp.SetName("test_metric1") + oneDp.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(1) + // 2 dps including 1 with attributes and none with the other + twoDp := ilms.Metrics().AppendEmpty() + twoDp.SetName("test_metric2") + gauge := twoDp.SetEmptyGauge() + gauge.DataPoints().AppendEmpty().SetIntValue(2) + dp2 := gauge.DataPoints().AppendEmpty() + dp2.SetIntValue(2) + dp2.Attributes().PutStr("del_key", "del_val") + + copyResourceAttributes(ms) + + resIlms := ms.ResourceMetrics().At(0).ScopeMetrics().At(0) + res1 := resIlms.Metrics().At(0).Gauge().DataPoints() + assert.Equal(t, rattrs.Len(), res1.At(0).Attributes().Len()) + assert.Equal(t, rattrs.AsRaw(), res1.At(0).Attributes().AsRaw()) + res2 := resIlms.Metrics().At(1).Gauge().DataPoints() + assert.Equal(t, rattrs.Len(), res2.At(0).Attributes().Len()) + assert.Equal(t, rattrs.AsRaw(), res2.At(0).Attributes().AsRaw()) + assert.Equal(t, rattrs.Len(), res2.At(1).Attributes().Len()) + assert.Equal(t, rattrs.AsRaw(), res2.At(1).Attributes().AsRaw()) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go index 435080d698ba..99d93cd54782 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go @@ -90,7 +90,7 @@ func (k *K8sWindows) GetMetrics() []pmetric.Metrics { metrics = cExtractor.MergeMetrics(metrics) metrics = k.decorateMetrics(metrics) for _, ciMetric := range metrics { - md := ci.ConvertToOTLPMetrics(ciMetric.GetFields(), ciMetric.GetTags(), false, k.logger) + md := ci.ConvertToOTLPMetrics(ciMetric.GetFields(), ciMetric.GetTags(), k.logger) result = append(result, md) } From e7eafc4e2c00d99fdb815ecc8a7324e8fa56fc88 Mon Sep 17 00:00:00 2001 From: Hyunsoo Kim Date: Fri, 24 May 2024 14:33:15 -0400 Subject: [PATCH 3/3] remove the extra process to copy attributes --- .../internal/k8sapiserver/k8sapiserver.go | 1 - .../internal/k8sapiserver/utils.go | 27 -------------- .../internal/k8sapiserver/utils_test.go | 35 ------------------- 3 files changed, 63 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index 3045adadab35..c14d10ed5005 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -532,7 +532,6 @@ func (k *K8sAPIServer) getAcceleratorCountMetrics(clusterName, timestampNs strin } attributes[ci.SourcesKey] = "[\"apiserver\"]" md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) - copyResourceAttributes(md) metrics = append(metrics, md) } return metrics diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go index 956b721855c7..c6a6293630fd 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go @@ -7,7 +7,6 @@ import ( "regexp" "strings" - "go.opentelemetry.io/collector/pdata/pmetric" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" @@ -130,29 +129,3 @@ func parseDeploymentFromReplicaSet(name string) string { } return name[:lastDash] } - -// ConvertToOTLPMetrics creates otel metrics by appending scope metrics with 1 datapoint each for corresponding field items -// this function loops through metric object to copy down resource attributes to datapoints -func copyResourceAttributes(metrics pmetric.Metrics) { - rms := metrics.ResourceMetrics() - if rms.Len() == 0 { - return - } - rm := rms.At(0) - resource := rm.Resource() - for si := range rm.ScopeMetrics().Len() { - scopeMetric := rm.ScopeMetrics().At(si) - if scopeMetric.Metrics().Len() == 0 { - return - } - for mi := range scopeMetric.Metrics().Len() { - dps := scopeMetric.Metrics().At(mi).Gauge().DataPoints() - if dps.Len() == 0 { - return - } - for di := range dps.Len() { - resource.Attributes().CopyTo(dps.At(di).Attributes()) - } - } - } -} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go index 8c22e294f7d0..6e7e0fd28fc4 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/pmetric" v1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" @@ -62,37 +61,3 @@ func TestPodStore_addPodConditionMetrics(t *testing.T) { } assert.Equal(t, expectedFieldsArray, fields) } - -func TestUtils_copyResourceAttributes(t *testing.T) { - ms := pmetric.NewMetrics() - rm := ms.ResourceMetrics().AppendEmpty() - rattrs := rm.Resource().Attributes() - rattrs.PutStr("key1", "value1") - rattrs.PutStr("key2", "value2") - - // 1 datapoint with no existing attributes - ilms := rm.ScopeMetrics().AppendEmpty() - oneDp := ilms.Metrics().AppendEmpty() - oneDp.SetName("test_metric1") - oneDp.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(1) - // 2 dps including 1 with attributes and none with the other - twoDp := ilms.Metrics().AppendEmpty() - twoDp.SetName("test_metric2") - gauge := twoDp.SetEmptyGauge() - gauge.DataPoints().AppendEmpty().SetIntValue(2) - dp2 := gauge.DataPoints().AppendEmpty() - dp2.SetIntValue(2) - dp2.Attributes().PutStr("del_key", "del_val") - - copyResourceAttributes(ms) - - resIlms := ms.ResourceMetrics().At(0).ScopeMetrics().At(0) - res1 := resIlms.Metrics().At(0).Gauge().DataPoints() - assert.Equal(t, rattrs.Len(), res1.At(0).Attributes().Len()) - assert.Equal(t, rattrs.AsRaw(), res1.At(0).Attributes().AsRaw()) - res2 := resIlms.Metrics().At(1).Gauge().DataPoints() - assert.Equal(t, rattrs.Len(), res2.At(0).Attributes().Len()) - assert.Equal(t, rattrs.AsRaw(), res2.At(0).Attributes().AsRaw()) - assert.Equal(t, rattrs.Len(), res2.At(1).Attributes().Len()) - assert.Equal(t, rattrs.AsRaw(), res2.At(1).Attributes().AsRaw()) -}