From 41c5a841eb25494d134bed1ebd992246db5d5f84 Mon Sep 17 00:00:00 2001 From: payall4u Date: Sun, 21 Jan 2024 10:47:51 +0800 Subject: [PATCH] Bugfix: avoid ext resource less then allocated --- pkg/agent/agent.go | 2 +- pkg/resource/node_resource_manager.go | 58 +++++++++++++++---- ...urce_manger.go => pod_resource_manager.go} | 0 3 files changed, 49 insertions(+), 11 deletions(-) rename pkg/resource/{pod_resource_manger.go => pod_resource_manager.go} (100%) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index fa236c6a2..62ae7cdab 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -120,7 +120,7 @@ func NewAgent(ctx context.Context, if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { tspName := agent.CreateNodeResourceTsp() - nodeResourceManager, err := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceReserved, tspName, nodeInformer, tspInformer, stateCollector.NodeResourceChann) + nodeResourceManager, err := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceReserved, tspName, nodeInformer, podInformer, tspInformer, stateCollector.NodeResourceChann) if err != nil { return agent, err } diff --git a/pkg/resource/node_resource_manager.go b/pkg/resource/node_resource_manager.go index 130b3a5cb..644740f72 100644 --- a/pkg/resource/node_resource_manager.go +++ b/pkg/resource/node_resource_manager.go @@ -10,6 +10,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/json" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -55,6 +56,9 @@ type NodeResourceManager struct { nodeLister corelisters.NodeLister nodeSynced cache.InformerSynced + podLister corelisters.PodLister + podSynced cache.InformerSynced + tspLister predictionlisters.TimeSeriesPredictionLister tspSynced cache.InformerSynced @@ -71,7 +75,7 @@ type NodeResourceManager struct { tspName string } -func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeResourceReserved map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, +func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeResourceReserved map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, podInformer coreinformers.PodInformer, tspInformer predictionv1.TimeSeriesPredictionInformer, stateChann chan map[string][]common.TimeSeries) (*NodeResourceManager, error) { reserveCpuPercent, err := utils.ParsePercentage(nodeResourceReserved[v1.ResourceCPU.String()]) if err != nil { @@ -92,6 +96,8 @@ func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeRes client: client, nodeLister: nodeInformer.Lister(), nodeSynced: nodeInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podSynced: podInformer.Informer().HasSynced, tspLister: tspInformer.Lister(), tspSynced: tspInformer.Informer().HasSynced, recorder: recorder, @@ -117,6 +123,7 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) { stop, o.tspSynced, o.nodeSynced, + o.podSynced, ) { return } @@ -144,7 +151,11 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) { } func (o *NodeResourceManager) UpdateNodeResource() { - node := o.getNode() + node, err := o.getNode() + if err != nil { + klog.ErrorS(err, "Get node failed") + return + } if len(node.Status.Addresses) == 0 { klog.Error("Node addresses is empty") return @@ -168,13 +179,36 @@ func (o *NodeResourceManager) UpdateNodeResource() { } } -func (o *NodeResourceManager) getNode() *v1.Node { - node, err := o.nodeLister.Get(o.nodeName) +func (o *NodeResourceManager) getNode() (*v1.Node, error) { + return o.nodeLister.Get(o.nodeName) +} + +func (o *NodeResourceManager) getExtResourceAllocated(extResource string) (float64, error) { + pods, err := o.podLister.List(labels.Everything()) if err != nil { - klog.Errorf("Failed to get node: %v", err) - return nil + return 0, err + } + allocated := 0.0 + allocatedFromContainer := func(container *v1.Container) float64 { + if res, exist := container.Resources.Requests[v1.ResourceName(extResource)]; exist { + return float64(res.Value()) + } + return 0.0 } - return node + for _, pod := range pods { + if pod.Status.Phase != v1.PodRunning { + continue + } + var one = 0.0 + for _, container := range pod.Spec.Containers { + one += allocatedFromContainer(&container) + } + for _, container := range pod.Spec.Containers { + one = math.Max(one, allocatedFromContainer(&container)) + } + allocated += one + } + return allocated, nil } func (o *NodeResourceManager) FindTargetNode(tsp *predictionapi.TimeSeriesPrediction, addresses []v1.NodeAddress) (bool, error) { @@ -238,11 +272,15 @@ func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[string]int64 { default: continue } - if nextRecommendation < 0 { - nextRecommendation = 0 + extResourceName := fmt.Sprintf(utils.ExtResourcePrefixFormat, string(resourceName)) + extResourceAllocated, err := o.getExtResourceAllocated(extResourceName) + if err != nil { + klog.Warningf("Get allocated ext resources %s failed: %s", extResourceName, err.Error()) + } + if nextRecommendation < extResourceAllocated { + nextRecommendation = extResourceAllocated } metrics.UpdateNodeResourceRecommendedValue(metrics.SubComponentNodeResource, metrics.StepGetExtResourceRecommended, string(resourceName), resourceFrom, nextRecommendation) - extResourceName := fmt.Sprintf(utils.ExtResourcePrefixFormat, string(resourceName)) resValue, exists := node.Status.Capacity[v1.ResourceName(extResourceName)] if exists && resValue.Value() != 0 && math.Abs(float64(resValue.Value())- diff --git a/pkg/resource/pod_resource_manger.go b/pkg/resource/pod_resource_manager.go similarity index 100% rename from pkg/resource/pod_resource_manger.go rename to pkg/resource/pod_resource_manager.go