Skip to content

Commit

Permalink
Add Resiliency Metrics (#226)
Browse files Browse the repository at this point in the history
Co-authored-by: Reham Tarek <[email protected]>
  • Loading branch information
Reham77 and Reham Tarek authored Aug 19, 2024
1 parent 5427408 commit d503b27
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 38 deletions.
18 changes: 18 additions & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ const (
GpuRequest = "gpu_request"
GpuReservedCapacity = "gpu_reserved_capacity"

HyperPodUnschedulablePendingReplacement = "unschedulable_pending_replacement"
HyperPodUnschedulablePendingReboot = "unschedulable_pending_reboot"
HyperPodSchedulable = "schedulable"
HyperPodUnschedulable = "unschedulable"

// Define the metric types
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
Expand Down Expand Up @@ -179,6 +184,7 @@ const (
TypeContainerEFA = "ContainerEFA"
TypePodEFA = "PodEFA"
TypeNodeEFA = "NodeEFA"
TypeHyperPodNode = "HyperPodNode"

// unit
UnitBytes = "Bytes"
Expand All @@ -202,6 +208,13 @@ var WaitingReasonLookup = map[string]string{
"StartError": StatusContainerWaitingReasonStartError,
}

var HyperPodConditionToMetric = map[string]string{
"UnschedulablePendingReplacement": HyperPodUnschedulablePendingReplacement,
"UnschedulablePendingReboot": HyperPodUnschedulablePendingReboot,
"Schedulable": HyperPodSchedulable,
"Unschedulable": HyperPodUnschedulable,
}

var metricToUnitMap map[string]string

func init() {
Expand Down Expand Up @@ -330,5 +343,10 @@ func init() {
GpuUsageTotal: UnitCount,
GpuRequest: UnitCount,
GpuReservedCapacity: UnitPercent,

HyperPodUnschedulablePendingReplacement: UnitCount,
HyperPodUnschedulablePendingReboot: UnitCount,
HyperPodSchedulable: UnitCount,
HyperPodUnschedulable: UnitCount,
}
}
6 changes: 5 additions & 1 deletion internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func IsNode(mType string) bool {
TypeNodeEFA,
TypeNodeFS,
TypeNodeGPU,
TypeNodeNet:
TypeNodeNet,
TypeHyperPodNode:
return true
}
return false
Expand Down Expand Up @@ -107,6 +108,7 @@ func getPrefixByMetricType(mType string) string {
instanceNetPrefix := "instance_interface_"
nodeNetPrefix := "node_interface_"
nodeEfaPrefix := "node_efa_"
hyperPodNodeHealthStatus := "hyper_pod_node_health_status_"
podPrefix := "pod_"
podNetPrefix := "pod_interface_"
podEfaPrefix := "pod_efa_"
Expand Down Expand Up @@ -169,6 +171,8 @@ func getPrefixByMetricType(mType string) string {
prefix = statefulSet
case TypeClusterReplicaSet:
prefix = replicaSet
case TypeHyperPodNode:
prefix = hyperPodNodeHealthStatus
default:
log.Printf("E! Unexpected MetricType: %s", mType)
}
Expand Down
37 changes: 37 additions & 0 deletions internal/aws/containerinsight/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestIsNode(t *testing.T) {
assert.Equal(t, true, IsNode(TypeNodeGPU))
assert.Equal(t, true, IsNode(TypeNodeNet))
assert.Equal(t, false, IsNode(TypePod))
assert.Equal(t, true, IsNode(TypeHyperPodNode))
}

func TestIsInstance(t *testing.T) {
Expand Down Expand Up @@ -929,3 +930,39 @@ func TestConvertToOTLPMetricsForPodEfaMetrics(t *testing.T) {
md = ConvertToOTLPMetrics(fields, tags, zap.NewNop())
checkMetricsAreExpected(t, md, fields, tags, expectedUnits)
}

func TestConvertToOTLPMetricsForHyperPodNodeMetrics(t *testing.T) {
var fields map[string]any
var expectedUnits map[string]string
var tags map[string]string
var md pmetric.Metrics
now := time.Now()
timestamp := strconv.FormatInt(now.UnixNano(), 10)

fields = map[string]any{
"unschedulable_pending_replacement": 0,
"unschedulable_pending_reboot": 0,
"schedulable": 1,
"unschedulable": 0,
}
expectedUnits = map[string]string{
"unschedulable_pending_replacement": UnitCount,
"unschedulable_pending_reboot": UnitCount,
"schedulable": UnitCount,
"unschedulable": UnitCount,
}
tags = map[string]string{
"ClusterName": "eks-aoc",
"InstanceId": "i-01bf9fb097cbf3205",
"InstanceType": "t2.xlarge",
"Namespace": "amazon-cloudwatch",
"NodeName": "hyperpod-ip-192-168-12-170.ec2.internal",
"PodName": "cloudwatch-agent",
"ContainerName": "cloudwatch-agent",
"Type": "HyperPodNode",
"Version": "0",
"Timestamp": timestamp,
}
md = ConvertToOTLPMetrics(fields, tags, zap.NewNop())
checkMetricsAreExpected(t, md, fields, tags, expectedUnits)
}
19 changes: 16 additions & 3 deletions internal/aws/k8s/k8sclient/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ func CaptureNodeLevelInfo(captureNodeLevelInfo bool) Option {
}
}

// CaptureOnlyNodeLabelsInfo allows one to specify whether node label
// should be captured and retained in memory
func CaptureOnlyNodeLabelsInfo(captureOnlyNodeLabelInfo bool) Option {
return Option{
name: "captureOnlyNodeLabelInfo:" + strconv.FormatBool(captureOnlyNodeLabelInfo),
set: func(kc *K8sClient) {
kc.captureOnlyNodeLabelInfo = captureOnlyNodeLabelInfo
},
}
}

func getStringifiedOptions(options ...Option) string {
opts := make([]string, len(options))
for i, option := range options {
Expand Down Expand Up @@ -225,8 +236,9 @@ type K8sClient struct {
nodeMu sync.Mutex
node nodeClientWithStopper

nodeSelector fields.Selector
captureNodeLevelInfo bool
nodeSelector fields.Selector
captureNodeLevelInfo bool
captureOnlyNodeLabelInfo bool

jobMu sync.Mutex
job jobClientWithStopper
Expand Down Expand Up @@ -326,7 +338,8 @@ func (c *K8sClient) ShutdownPodClient() {
func (c *K8sClient) GetNodeClient() NodeClient {
c.nodeMu.Lock()
if c.node == nil {
opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo)}
opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo),
captureOnlyNodeLabelInfoOption(c.captureOnlyNodeLabelInfo)}
if c.nodeSelector != nil {
opts = append(opts, nodeSelectorOption(c.nodeSelector))
}
Expand Down
95 changes: 82 additions & 13 deletions internal/aws/k8s/k8sclient/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil"
)

const (
instanceTypeLabelKey = "node.kubernetes.io/instance-type"
instanceTypeLabelKeyBeta = "beta.kubernetes.io/instance-type"
)

// This needs to be reviewed for newer versions of k8s.
Expand All @@ -27,13 +34,15 @@ var failedNodeConditions = map[v1.NodeConditionType]bool{
}

type NodeClient interface {
NodeInfos() map[string]*NodeInfo
// Get the number of failed nodes for current cluster
ClusterFailedNodeCount() int
// Get the number of nodes for current cluster
ClusterNodeCount() int
NodeToCapacityMap() map[string]v1.ResourceList
NodeToAllocatableMap() map[string]v1.ResourceList
NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus
NodeToLabelsMap() map[string]map[Label]int8
}

type nodeClientOption func(*nodeClient)
Expand All @@ -56,6 +65,12 @@ func captureNodeLevelInfoOption(captureNodeLevelInfo bool) nodeClientOption {
}
}

func captureOnlyNodeLabelInfoOption(captureOnlyNodeLabelInfo bool) nodeClientOption {
return func(n *nodeClient) {
n.captureOnlyNodeLabelInfo = captureOnlyNodeLabelInfo
}
}

type nodeClient struct {
stopChan chan struct{}
store *ObjStore
Expand All @@ -69,14 +84,26 @@ type nodeClient struct {
// The node client can be used in several places, including code paths that execute on both leader and non-leader nodes.
// But for logic on the leader node (for ex in k8sapiserver.go), there is no need to obtain node level info since only cluster
// level info is needed there. Hence, this optimization allows us to save on memory by not capturing node level info when not needed.
captureNodeLevelInfo bool
captureNodeLevelInfo bool
captureOnlyNodeLabelInfo bool

mu sync.RWMutex
nodeInfos map[string]*NodeInfo
clusterFailedNodeCount int
clusterNodeCount int
nodeToCapacityMap map[string]v1.ResourceList
nodeToAllocatableMap map[string]v1.ResourceList
nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus
nodeToLabelsMap map[string]map[Label]int8
}

func (c *nodeClient) NodeInfos() map[string]*NodeInfo {
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeInfos
}

func (c *nodeClient) ClusterFailedNodeCount() int {
Expand All @@ -97,6 +124,18 @@ func (c *nodeClient) ClusterNodeCount() int {
return c.clusterNodeCount
}

func (c *nodeClient) NodeToLabelsMap() map[string]map[Label]int8 {
if !c.captureOnlyNodeLabelInfo {
c.logger.Warn("trying to access node label info when captureOnlyNodeLabelInfo is not set, will return empty data")
}
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToLabelsMap
}

func (c *nodeClient) NodeToCapacityMap() map[string]v1.ResourceList {
if !c.captureNodeLevelInfo {
c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data")
Expand Down Expand Up @@ -144,25 +183,37 @@ func (c *nodeClient) refresh() {
nodeToCapacityMap := make(map[string]v1.ResourceList)
nodeToAllocatableMap := make(map[string]v1.ResourceList)
nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus)
nodeToLabelsMap := make(map[string]map[Label]int8)

nodeInfos := map[string]*NodeInfo{}

for _, obj := range objsList {
node := obj.(*nodeInfo)
node := obj.(*NodeInfo)
nodeInfos[node.Name] = node

if c.captureNodeLevelInfo {
nodeToCapacityMap[node.name] = node.capacity
nodeToAllocatableMap[node.name] = node.allocatable
nodeToCapacityMap[node.Name] = node.Capacity
nodeToAllocatableMap[node.Name] = node.Allocatable
conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus)
for _, condition := range node.conditions {
for _, condition := range node.Conditions {
conditionsMap[condition.Type] = condition.Status
}
nodeToConditionsMap[node.name] = conditionsMap
nodeToConditionsMap[node.Name] = conditionsMap
}

if c.captureOnlyNodeLabelInfo {
labelsMap := make(map[Label]int8)
if HyperPodLabel, ok := node.Labels[SageMakerNodeHealthStatus]; ok {
labelsMap[SageMakerNodeHealthStatus] = HyperPodLabel
nodeToLabelsMap[node.Name] = labelsMap
}
}
clusterNodeCountNew++

failed := false

Loop:
for _, condition := range node.conditions {
for _, condition := range node.Conditions {
if _, ok := failedNodeConditions[condition.Type]; ok {
// match the failedNodeConditions type we care about
if condition.Status != v1.ConditionFalse {
Expand All @@ -178,11 +229,13 @@ func (c *nodeClient) refresh() {
}
}

c.nodeInfos = nodeInfos
c.clusterFailedNodeCount = clusterFailedNodeCountNew
c.clusterNodeCount = clusterNodeCountNew
c.nodeToCapacityMap = nodeToCapacityMap
c.nodeToAllocatableMap = nodeToAllocatableMap
c.nodeToConditionsMap = nodeToConditionsMap
c.nodeToLabelsMap = nodeToLabelsMap
}

func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...nodeClientOption) *nodeClient {
Expand Down Expand Up @@ -222,17 +275,33 @@ func transformFuncNode(obj any) (any, error) {
if !ok {
return nil, fmt.Errorf("input obj %v is not Node type", obj)
}
info := new(nodeInfo)
info.name = node.Name
info.capacity = node.Status.Capacity
info.allocatable = node.Status.Allocatable
info.conditions = []*NodeCondition{}
info := new(NodeInfo)
info.Name = node.Name
info.Capacity = node.Status.Capacity
info.Allocatable = node.Status.Allocatable
if instanceType, ok := node.Labels[instanceTypeLabelKey]; ok {
info.InstanceType = instanceType
} else {
// fallback for compatibility with k8s versions older than v1.17
// https://kubernetes.io/docs/reference/labels-annotations-taints/#beta-kubernetes-io-instance-type-deprecated
if instanceType, ok := node.Labels[instanceTypeLabelKeyBeta]; ok {
info.InstanceType = instanceType
}
}
info.Conditions = []*NodeCondition{}
for _, condition := range node.Status.Conditions {
info.conditions = append(info.conditions, &NodeCondition{
info.Conditions = append(info.Conditions, &NodeCondition{
Type: condition.Type,
Status: condition.Status,
})
}

if sageMakerHealthStatus, ok := node.Labels[SageMakerNodeHealthStatus.String()]; ok {
info.Labels = make(map[Label]int8)
if condition, ok := k8sutil.ParseString(sageMakerHealthStatus); ok {
info.Labels[SageMakerNodeHealthStatus] = condition
}
}
return info, nil
}

Expand Down
22 changes: 17 additions & 5 deletions internal/aws/k8s/k8sclient/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,26 @@ import (
v1 "k8s.io/api/core/v1"
)

type nodeInfo struct {
name string
conditions []*NodeCondition
capacity v1.ResourceList
allocatable v1.ResourceList
type NodeInfo struct {
Name string
Conditions []*NodeCondition
Capacity v1.ResourceList
Allocatable v1.ResourceList
InstanceType string
Labels map[Label]int8
}

type NodeCondition struct {
Type v1.NodeConditionType
Status v1.ConditionStatus
}

type Label int8

const (
SageMakerNodeHealthStatus Label = iota
)

func (ct Label) String() string {
return [...]string{"sagemaker.amazonaws.com/node-health-status"}[ct]
}
Loading

0 comments on commit d503b27

Please sign in to comment.