Skip to content

Commit

Permalink
Add gpu count metrics including limit, request and total counts (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
movence authored May 29, 2024
1 parent 7b9a072 commit 2728c19
Show file tree
Hide file tree
Showing 15 changed files with 514 additions and 38 deletions.
8 changes: 8 additions & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ const (
EfaRxDropped = "rx_dropped"
EfaTxBytes = "tx_bytes"

GpuLimit = "gpu_limit"
GpuTotal = "gpu_total"
GpuRequest = "gpu_request"

// Define the metric types
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
Expand Down Expand Up @@ -319,5 +323,9 @@ func init() {
EfaRxBytes: UnitBytesPerSec,
EfaRxDropped: UnitCountPerSec,
EfaTxBytes: UnitBytesPerSec,

GpuLimit: UnitCount,
GpuTotal: UnitCount,
GpuRequest: UnitCount,
}
}
19 changes: 10 additions & 9 deletions internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func getPrefixByMetricType(mType string) string {
prefix = nodeNetPrefix
case TypeNodeEFA:
prefix = nodeEfaPrefix
case TypePod:
case TypePod, TypePodGPU:
prefix = podPrefix
case TypePodNet:
prefix = podNetPrefix
Expand Down Expand Up @@ -259,23 +259,24 @@ func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger
for key, value := range fields {
metric := RemovePrefix(metricType, key)
unit := GetUnitForMetric(metric)
scopeMetric := ilms.AppendEmpty()
switch t := value.(type) {
case int:
intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp)
intGauge(scopeMetric, key, unit, int64(t), timestamp)
case int32:
intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp)
intGauge(scopeMetric, key, unit, int64(t), timestamp)
case int64:
intGauge(ilms.AppendEmpty(), key, unit, t, timestamp)
intGauge(scopeMetric, key, unit, t, timestamp)
case uint:
intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp)
intGauge(scopeMetric, key, unit, int64(t), timestamp)
case uint32:
intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp)
intGauge(scopeMetric, key, unit, int64(t), timestamp)
case uint64:
intGauge(ilms.AppendEmpty(), key, unit, int64(t), timestamp)
intGauge(scopeMetric, key, unit, int64(t), timestamp)
case float32:
doubleGauge(ilms.AppendEmpty(), key, unit, float64(t), timestamp)
doubleGauge(scopeMetric, key, unit, float64(t), timestamp)
case float64:
doubleGauge(ilms.AppendEmpty(), key, unit, t, timestamp)
doubleGauge(scopeMetric, key, unit, t, timestamp)
default:
valueType := fmt.Sprintf("%T", value)
logger.Warn("Detected unexpected field", zap.String("key", key), zap.Any("value", value), zap.String("value type", valueType))
Expand Down
37 changes: 36 additions & 1 deletion internal/aws/containerinsight/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ func TestConvertToOTLPMetricsForInvalidMetrics(t *testing.T) {
md = ConvertToOTLPMetrics(fields, tags, zap.NewNop())
rm := md.ResourceMetrics().At(0)
ilms := rm.ScopeMetrics()
assert.Equal(t, 0, ilms.Len())
assert.Equal(t, 1, ilms.Len())
assert.Equal(t, 0, ilms.At(0).Metrics().Len())
}

func TestConvertToOTLPMetricsForClusterMetrics(t *testing.T) {
Expand Down Expand Up @@ -912,3 +913,37 @@ func TestConvertToOTLPMetricsForPodEfaMetrics(t *testing.T) {
md = ConvertToOTLPMetrics(fields, tags, zap.NewNop())
checkMetricsAreExpected(t, md, fields, tags, expectedUnits)
}

func TestConvertToOTLPMetricsForAcceleratorCountMetrics(t *testing.T) {
var fields map[string]any
var expectedUnits map[string]string
var tags map[string]string
var md pmetric.Metrics
now := time.Now()
timestamp := strconv.FormatInt(now.UnixNano(), 10)

fields = map[string]any{
"pod_gpu_limit": int64(3),
"pod_gpu_total": int64(3),
"pod_gpu_request": int64(3),
}
expectedUnits = map[string]string{
"pod_gpu_limit": UnitCount,
"pod_gpu_total": UnitCount,
"pod_gpu_request": UnitCount,
}
tags = map[string]string{
"ClusterName": "eks-aoc",
"InstanceId": "i-01bf9fb097cbf3205",
"InstanceType": "t2.xlarge",
"Namespace": "amazon-cloudwatch",
"NodeName": "ip-192-168-12-170.ec2.internal",
"PodName": "cloudwatch-agent",
"ContainerName": "cloudwatch-agent",
"Type": "PodGPU",
"Version": "0",
"Timestamp": timestamp,
}
md = ConvertToOTLPMetrics(fields, tags, zap.NewNop())
checkMetricsAreExpected(t, md, fields, tags, expectedUnits)
}
53 changes: 41 additions & 12 deletions internal/aws/k8s/k8sclient/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"k8s.io/client-go/tools/cache"
)

const (
instanceTypeLabelKey = "node.kubernetes.io/instance-type"
instanceTypeLabelKeyBeta = "beta.kubernetes.io/instance-type"
)

// This needs to be reviewed for newer versions of k8s.
var failedNodeConditions = map[v1.NodeConditionType]bool{
v1.NodeMemoryPressure: true,
Expand All @@ -27,6 +32,7 @@ var failedNodeConditions = map[v1.NodeConditionType]bool{
}

type NodeClient interface {
NodeInfos() map[string]*NodeInfo
// Get the number of failed nodes for current cluster
ClusterFailedNodeCount() int
// Get the number of nodes for current cluster
Expand Down Expand Up @@ -72,13 +78,23 @@ type nodeClient struct {
captureNodeLevelInfo bool

mu sync.RWMutex
nodeInfos map[string]*NodeInfo
clusterFailedNodeCount int
clusterNodeCount int
nodeToCapacityMap map[string]v1.ResourceList
nodeToAllocatableMap map[string]v1.ResourceList
nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus
}

func (c *nodeClient) NodeInfos() map[string]*NodeInfo {
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeInfos
}

func (c *nodeClient) ClusterFailedNodeCount() int {
if c.store.GetResetRefreshStatus() {
c.refresh()
Expand Down Expand Up @@ -145,24 +161,26 @@ func (c *nodeClient) refresh() {
nodeToAllocatableMap := make(map[string]v1.ResourceList)
nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus)

nodeInfos := map[string]*NodeInfo{}
for _, obj := range objsList {
node := obj.(*nodeInfo)
node := obj.(*NodeInfo)
nodeInfos[node.Name] = node

if c.captureNodeLevelInfo {
nodeToCapacityMap[node.name] = node.capacity
nodeToAllocatableMap[node.name] = node.allocatable
nodeToCapacityMap[node.Name] = node.Capacity
nodeToAllocatableMap[node.Name] = node.Allocatable
conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus)
for _, condition := range node.conditions {
for _, condition := range node.Conditions {
conditionsMap[condition.Type] = condition.Status
}
nodeToConditionsMap[node.name] = conditionsMap
nodeToConditionsMap[node.Name] = conditionsMap
}
clusterNodeCountNew++

failed := false

Loop:
for _, condition := range node.conditions {
for _, condition := range node.Conditions {
if _, ok := failedNodeConditions[condition.Type]; ok {
// match the failedNodeConditions type we care about
if condition.Status != v1.ConditionFalse {
Expand All @@ -178,6 +196,7 @@ func (c *nodeClient) refresh() {
}
}

c.nodeInfos = nodeInfos
c.clusterFailedNodeCount = clusterFailedNodeCountNew
c.clusterNodeCount = clusterNodeCountNew
c.nodeToCapacityMap = nodeToCapacityMap
Expand Down Expand Up @@ -222,13 +241,23 @@ func transformFuncNode(obj any) (any, error) {
if !ok {
return nil, fmt.Errorf("input obj %v is not Node type", obj)
}
info := new(nodeInfo)
info.name = node.Name
info.capacity = node.Status.Capacity
info.allocatable = node.Status.Allocatable
info.conditions = []*NodeCondition{}
info := new(NodeInfo)
info.Name = node.Name
info.Capacity = node.Status.Capacity
info.Allocatable = node.Status.Allocatable
info.Conditions = []*NodeCondition{}
info.ProviderId = node.Spec.ProviderID
if instanceType, ok := node.Labels[instanceTypeLabelKey]; ok {
info.InstanceType = instanceType
} else {
// fallback for compatibility with k8s versions older than v1.17
// https://kubernetes.io/docs/reference/labels-annotations-taints/#beta-kubernetes-io-instance-type-deprecated
if instanceType, ok := node.Labels[instanceTypeLabelKeyBeta]; ok {
info.InstanceType = instanceType
}
}
for _, condition := range node.Status.Conditions {
info.conditions = append(info.conditions, &NodeCondition{
info.Conditions = append(info.Conditions, &NodeCondition{
Type: condition.Type,
Status: condition.Status,
})
Expand Down
12 changes: 7 additions & 5 deletions internal/aws/k8s/k8sclient/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
v1 "k8s.io/api/core/v1"
)

type nodeInfo struct {
name string
conditions []*NodeCondition
capacity v1.ResourceList
allocatable v1.ResourceList
type NodeInfo struct {
Name string
Conditions []*NodeCondition
Capacity v1.ResourceList
Allocatable v1.ResourceList
ProviderId string
InstanceType string
}

type NodeCondition struct {
Expand Down
Loading

0 comments on commit 2728c19

Please sign in to comment.