Skip to content

Commit

Permalink
Upgrade to cluster-autoscaler 1.29 - take two
Browse files Browse the repository at this point in the history
  • Loading branch information
edude03 committed Nov 19, 2024
1 parent 3fab71f commit 726f4f1
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 123 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# NOTE: This must match CA's builder/Dockerfile:
# https://github.com/kubernetes/autoscaler/blob/<GIT_TAG>/builder/Dockerfile
FROM golang:1.20.12 AS builder
FROM golang:1.21.6 AS builder

WORKDIR /workspace

Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/ca.branch
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cluster-autoscaler-release-1.28
cluster-autoscaler-release-1.29
2 changes: 1 addition & 1 deletion cluster-autoscaler/ca.commit
Original file line number Diff line number Diff line change
@@ -1 +1 @@
10a229ac17ea8049248d1c3ce2923b94a4f9085c
d4bbc686ac02a77a6ad1362fe7bbda387e8f074a
169 changes: 49 additions & 120 deletions cluster-autoscaler/ca.patch
Original file line number Diff line number Diff line change
@@ -1,137 +1,66 @@
diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go
--- a/cluster-autoscaler/utils/kubernetes/listers.go (revision 6a3cf228856724a44fb66f0e2274ebf75e89e6e9)
+++ b/cluster-autoscaler/utils/kubernetes/listers.go (date 1730896304084)
@@ -17,6 +17,11 @@
index b9be94b6e..403644b71 100644
--- a/cluster-autoscaler/utils/kubernetes/listers.go
+++ b/cluster-autoscaler/utils/kubernetes/listers.go
@@ -17,10 +17,12 @@ limitations under the License.
package kubernetes

import (
+ "encoding/json"
+ "k8s.io/apimachinery/pkg/api/resource"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
"time"

appsv1 "k8s.io/api/apps/v1"
@@ -169,6 +174,7 @@
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch)
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
go reflector.Run(stopchannel)
@@ -212,6 +218,7 @@
selector := fields.ParseSelectorOrDie("status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch)
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
go reflector.Run(stopchannel)
@@ -221,6 +228,105 @@
}

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
@@ -46,6 +48,11 @@ type ListerRegistry interface {
StatefulSetLister() v1appslister.StatefulSetLister
}

+// copied from github.com/neondatabase/autoscaling, neonvm/apis/neonvm/v1/virtualmachine_types.go.
+//
+// this is duplicated so we're not *also* managing an additional dependency.
+type virtualMachineUsage struct {
+ CPU resource.Quantity `json:"cpu"`
+ Memory resource.Quantity `json:"memory"`
+}
+
+func wrapListWatchWithNeonVMUsage(lw *cache.ListWatch) *cache.ListWatch {
+ updatePodRequestsFromNeonVMAnnotation := func(pod *apiv1.Pod) {
+ annotation, ok := pod.Annotations["vm.neon.tech/usage"]
+ if !ok {
+ return
+ }
+
+ var usage virtualMachineUsage
+ if err := json.Unmarshal([]byte(annotation), &usage); err != nil {
+ return
+ }
+
+ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{
+ apiv1.ResourceCPU: usage.CPU,
+ apiv1.ResourceMemory: usage.Memory,
+ })
type listerRegistryImpl struct {
allNodeLister NodeLister
readyNodeLister NodeLister
@@ -221,6 +228,22 @@ type AllPodLister struct {
podLister v1lister.PodLister
}

+func updatePodRequestsFromNeonVMAnnotation(pod *apiv1.Pod) {
+ annotation, ok := pod.Annotations["vm.neon.tech/usage"]
+ if !ok {
+ return
+ }
+
+ return &cache.ListWatch{
+ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
+ obj, err := lw.List(options)
+ if err != nil {
+ return obj, err
+ }
+
+ list := obj.(*apiv1.PodList)
+ for i := range list.Items {
+ updatePodRequestsFromNeonVMAnnotation(&list.Items[i])
+ }
+ return obj, nil
+ },
+ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
+ iface, err := lw.Watch(options)
+ if err != nil {
+ return iface, err
+ }
+
+ // Wrap the channel to update the pods as they come through
+ wrappedEvents := make(chan watch.Event)
+ proxyIface := watch.NewProxyWatcher(wrappedEvents)
+
+ go func() {
+ events := iface.ResultChan()
+
+ for {
+ var ok bool
+ var ev watch.Event
+
+ select {
+ case <-proxyIface.StopChan():
+ return
+ case ev, ok = <-events:
+ if !ok {
+ close(wrappedEvents)
+ return
+ }
+ }
+
+ // Quoting the docs on watch.Event.Object:
+ //
+ // > Object is:
+ // > * If Type is Added or Modified: the new state of the object
+ // > * If type is Deleted: the state of the object immediately before deletion.
+ // > * If Type is Bookmark: the object [ ... ] where only ResourceVersion field
+ // > is set.
+ // > * If Type is Error: *api.Status is recommended; other types may make sense
+ // > depending on context.
+ //
+ // So basically, we want to process the object only if ev.Type is Added,
+ // Modified, or Deleted.
+ if ev.Type == watch.Added || ev.Type == watch.Modified || ev.Type == watch.Deleted {
+ pod := ev.Object.(*apiv1.Pod)
+ updatePodRequestsFromNeonVMAnnotation(pod)
+ }
+
+ // Pass along the maybe-updated event
+ select {
+ case <-proxyIface.StopChan():
+ return
+ case wrappedEvents <- ev:
+ // continue on to next event
+ }
+ }
+ }()
+
+ return proxyIface, nil
+ },
+ DisableChunking: lw.DisableChunking,
+ var usage virtualMachineUsage
+ if err := json.Unmarshal([]byte(annotation), &usage); err != nil {
+ return
+ }
+ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{
+ apiv1.ResourceCPU: usage.CPU,
+ apiv1.ResourceMemory: usage.Memory,
+ })
+}
+
// NodeLister lists nodes.
type NodeLister interface {
List() ([]*apiv1.Node, error)
// List returns all scheduled pods.
func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
var pods []*apiv1.Pod
@@ -229,9 +252,12 @@ func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
if err != nil {
return pods, err
}
+
for _, p := range allPods {
if p.Status.Phase != apiv1.PodSucceeded && p.Status.Phase != apiv1.PodFailed {
- pods = append(pods, p)
+ podCopy := p.DeepCopy()
+ updatePodRequestsFromNeonVMAnnotation(podCopy)
+ pods = append(pods, podCopy)
}
}
return pods, nil

0 comments on commit 726f4f1

Please sign in to comment.