Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
DONOTSUBMIT
  • Loading branch information
towca committed Nov 20, 2024
1 parent a01276e commit 0a00f6b
Show file tree
Hide file tree
Showing 72 changed files with 2,472 additions and 2,148 deletions.
11 changes: 5 additions & 6 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
Expand All @@ -44,9 +44,8 @@ type AutoscalingContext struct {
AutoscalingKubeClients
// CloudProvider used in CA.
CloudProvider cloudprovider.CloudProvider
// TODO(kgolab) - move away too as it's not config
// PredicateChecker to check if a pod can fit into a node.
PredicateChecker predicatechecker.PredicateChecker
// FrameworkHandle can be used to interact with the scheduler framework.
FrameworkHandle *framework.Handle
// ClusterSnapshot denotes cluster snapshot used for predicate checking.
ClusterSnapshot clustersnapshot.ClusterSnapshot
// ExpanderStrategy is the strategy used to choose which node group to expand when scaling up
Expand Down Expand Up @@ -100,7 +99,7 @@ func NewResourceLimiterFromAutoscalingOptions(options config.AutoscalingOptions)
// NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments
func NewAutoscalingContext(
options config.AutoscalingOptions,
predicateChecker predicatechecker.PredicateChecker,
fwHandle *framework.Handle,
clusterSnapshot clustersnapshot.ClusterSnapshot,
autoscalingKubeClients *AutoscalingKubeClients,
cloudProvider cloudprovider.CloudProvider,
Expand All @@ -114,7 +113,7 @@ func NewAutoscalingContext(
AutoscalingOptions: options,
CloudProvider: cloudProvider,
AutoscalingKubeClients: *autoscalingKubeClients,
PredicateChecker: predicateChecker,
FrameworkHandle: fwHandle,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: expanderStrategy,
ProcessorCallbacks: processorCallbacks,
Expand Down
17 changes: 13 additions & 4 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/informers"
Expand All @@ -49,7 +51,7 @@ type AutoscalerOptions struct {
InformerFactory informers.SharedInformerFactory
AutoscalingKubeClients *context.AutoscalingKubeClients
CloudProvider cloudprovider.CloudProvider
PredicateChecker predicatechecker.PredicateChecker
FrameworkHandle *framework.Handle
ClusterSnapshot clustersnapshot.ClusterSnapshot
ExpanderStrategy expander.Strategy
EstimatorBuilder estimator.EstimatorBuilder
Expand Down Expand Up @@ -86,7 +88,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
}
return NewStaticAutoscaler(
opts.AutoscalingOptions,
opts.PredicateChecker,
opts.FrameworkHandle,
opts.ClusterSnapshot,
opts.AutoscalingKubeClients,
opts.Processors,
Expand Down Expand Up @@ -114,8 +116,15 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
if opts.FrameworkHandle == nil {
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig)
if err != nil {
return err
}
opts.FrameworkHandle = fwHandle
}
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = clustersnapshot.NewBasicClusterSnapshot()
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(base.NewBasicSnapshotBase(), opts.FrameworkHandle)
}
if opts.RemainingPdbTracker == nil {
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -194,7 +195,7 @@ func TestCurrentlyDrainedNodesPodListProcessor(t *testing.T) {
name: "single node, non-recreatable pods filtered out",
drainedNodes: []string{"n"},
nodes: []*apiv1.Node{
BuildTestNode("n", 1000, 10),
BuildTestNode("n", 2000, 10),
},
pods: []*apiv1.Pod{
BuildScheduledTestPod("p1", 100, 1, "n"),
Expand Down Expand Up @@ -229,11 +230,11 @@ func TestCurrentlyDrainedNodesPodListProcessor(t *testing.T) {
name: "everything works together",
drainedNodes: []string{"n1", "n3", "n5"},
nodes: []*apiv1.Node{
BuildTestNode("n1", 1000, 10),
BuildTestNode("n2", 1000, 10),
BuildTestNode("n3", 1000, 10),
BuildTestNode("n4", 1000, 10),
BuildTestNode("n5", 1000, 10),
BuildTestNode("n1", 3000, 10),
BuildTestNode("n2", 3000, 10),
BuildTestNode("n3", 3000, 10),
BuildTestNode("n4", 3000, 10),
BuildTestNode("n5", 3000, 10),
},
pods: []*apiv1.Pod{
BuildScheduledTestPod("p1", 100, 1, "n1"),
Expand Down Expand Up @@ -267,7 +268,7 @@ func TestCurrentlyDrainedNodesPodListProcessor(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := context.AutoscalingContext{
ScaleDownActuator: &mockActuator{&mockActuationStatus{tc.drainedNodes}},
ClusterSnapshot: clustersnapshot.NewBasicClusterSnapshot(),
ClusterSnapshot: testsnapshot.NewTestSnapshotOrDie(t),
}
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, tc.nodes, tc.pods)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

type filterOutExpendable struct {
Expand Down Expand Up @@ -56,7 +56,8 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
// TODO(DRA): Figure out if/how to use the predicate-checking SchedulePod() here instead - otherwise this doesn't work with DRA pods.
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"testing"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -108,8 +109,9 @@ func TestFilterOutExpendable(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot.AddNodes(tc.nodes)
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(tc.nodes, nil)
assert.NoError(t, err)

pods, err := processor.Process(&context.AutoscalingContext{
ClusterSnapshot: snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
klog "k8s.io/klog/v2"
Expand All @@ -38,9 +37,9 @@ type filterOutSchedulablePodListProcessor struct {
}

// NewFilterOutSchedulablePodListProcessor creates a PodListProcessor filtering out schedulable pods
func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*framework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
func NewFilterOutSchedulablePodListProcessor(nodeFilter func(*framework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
return &filterOutSchedulablePodListProcessor{
schedulingSimulator: scheduling.NewHintingSimulator(predicateChecker),
schedulingSimulator: scheduling.NewHintingSimulator(),
nodeFilter: nodeFilter,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ import (
"time"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"
)

func TestFilterOutSchedulable(t *testing.T) {
schedulermetrics.Register()

node := buildReadyTestNode("node", 2000, 100)
matchesAllNodes := func(*framework.NodeInfo) bool { return true }
matchesNoNodes := func(*framework.NodeInfo) bool { return false }
Expand Down Expand Up @@ -175,29 +174,23 @@ func TestFilterOutSchedulable(t *testing.T) {

for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
clusterSnapshot := testsnapshot.NewTestSnapshotOrDie(t)

var allExpectedScheduledPods []*apiv1.Pod
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)

for node, pods := range tc.nodesWithPods {
err := clusterSnapshot.AddNode(node)
assert.NoError(t, err)

for _, pod := range pods {
pod.Spec.NodeName = node.Name
err = clusterSnapshot.AddPod(pod, node.Name)
assert.NoError(t, err)

allExpectedScheduledPods = append(allExpectedScheduledPods, pod)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
assert.NoError(t, err)
}

clusterSnapshot.Fork()

processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, tc.nodeFilter)
processor := NewFilterOutSchedulablePodListProcessor(tc.nodeFilter)
unschedulablePods, err := processor.filterOutSchedulableByPacking(tc.unschedulableCandidates, clusterSnapshot)

assert.NoError(t, err)
Expand Down Expand Up @@ -256,8 +249,12 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
},
}
snapshots := map[string]func() clustersnapshot.ClusterSnapshot{
"basic": func() clustersnapshot.ClusterSnapshot { return clustersnapshot.NewBasicClusterSnapshot() },
"delta": func() clustersnapshot.ClusterSnapshot { return clustersnapshot.NewDeltaClusterSnapshot() },
"basic": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, base.NewBasicSnapshotBase())
},
"delta": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, base.NewDeltaSnapshotBase())
},
}
for snapshotName, snapshotFactory := range snapshots {
for _, tc := range tests {
Expand All @@ -282,23 +279,15 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
}
}

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.AddNodes(nodes); err != nil {
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
assert.NoError(b, err)
}

for _, pod := range scheduledPods {
if err := clusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
assert.NoError(b, err)
}
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, scheduling.ScheduleAnywhere)
processor := NewFilterOutSchedulablePodListProcessor(scheduling.ScheduleAnywhere)
if stillPending, err := processor.filterOutSchedulableByPacking(pendingPods, clusterSnapshot); err != nil {
assert.NoError(b, err)
} else if len(stillPending) < tc.pendingPods {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package podlistprocessor
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
)

// NewDefaultPodListProcessor returns a default implementation of the pod list
// processor, which wraps and sequentially runs other sub-processors.
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*framework.NodeInfo) bool) *pods.CombinedPodListProcessor {
func NewDefaultPodListProcessor(nodeFilter func(*framework.NodeInfo) bool) *pods.CombinedPodListProcessor {
return pods.NewCombinedPodListProcessor([]pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker, nodeFilter),
NewFilterOutSchedulablePodListProcessor(nodeFilter),
NewFilterOutDaemonSetPodListProcessor(),
})
}
23 changes: 6 additions & 17 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
Expand Down Expand Up @@ -356,8 +358,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
knownNodes := make(map[string]bool)
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot := predicate.NewPredicateSnapshot(base.NewBasicSnapshotBase(), a.ctx.FrameworkHandle)
pods, err := a.ctx.AllPodLister().List()
if err != nil {
return nil, err
Expand All @@ -366,22 +367,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
return nil, err
}

knownNodes[node.Name] = true
}

for _, pod := range nonExpendableScheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
return nil, err
}
}
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
if err != nil {
return nil, err
}

return snapshot, nil
}

Expand Down
Loading

0 comments on commit 0a00f6b

Please sign in to comment.