diff --git a/pkg/koordlet/statesinformer/kubelet_stub.go b/pkg/koordlet/statesinformer/kubelet_stub.go index ddf71ef45..bbbcc49b7 100644 --- a/pkg/koordlet/statesinformer/kubelet_stub.go +++ b/pkg/koordlet/statesinformer/kubelet_stub.go @@ -28,10 +28,15 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" + "k8s.io/kubernetes/cmd/kubelet/app/options" + kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" + kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" ) type KubeletStub interface { GetAllPods() (corev1.PodList, error) + GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error) } type kubeletStub struct { @@ -89,3 +94,48 @@ func (k *kubeletStub) GetAllPods() (corev1.PodList, error) { } return podList, nil } + +type kubeletConfigz struct { + ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"` +} + +func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error) { + configzURL := url.URL{ + Scheme: k.scheme, + Host: net.JoinHostPort(k.addr, strconv.Itoa(k.port)), + Path: "/configz", + } + rsp, err := k.httpClient.Get(configzURL.String()) + if err != nil { + return nil, err + } + defer rsp.Body.Close() + + if rsp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("request %s failed, code %d", configzURL.String(), rsp.StatusCode) + } + + body, err := io.ReadAll(rsp.Body) + if err != nil { + return nil, err + } + + var configz kubeletConfigz + if err = json.Unmarshal(body, &configz); err != nil { + return nil, fmt.Errorf("failed to unmarshal kubeletConfigz: %v", err) + } + + kubeletConfiguration, err := options.NewKubeletConfiguration() + if err != nil { + return nil, err + } + + scheme, _, err := kubeletscheme.NewSchemeAndCodecs() + if err != nil { + return nil, err + } + if err = scheme.Convert(&configz.ComponentConfig, kubeletConfiguration, nil); err != nil { + return nil, err + } + return kubeletConfiguration, nil +} diff --git a/pkg/koordlet/statesinformer/kubelet_stub_test.go b/pkg/koordlet/statesinformer/kubelet_stub_test.go index f2740a94e..af4e6601b 100644 --- a/pkg/koordlet/statesinformer/kubelet_stub_test.go +++ b/pkg/koordlet/statesinformer/kubelet_stub_test.go @@ -34,27 +34,56 @@ import ( ) var ( - token string + token string + kubeletConfigzData = []byte(` + { + "kubeletconfig": { + "enableServer": true, + "cpuManagerPolicy": "static", + "cpuManagerReconcilePeriod": "10s", + "evictionHard": { + "imagefs.available": "15%", + "memory.available": "222Mi", + "nodefs.available": "10%", + "nodefs.inodesFree": "5%" + }, + "systemReserved": { + "cpu": "200m", + "memory": "1111Mi", + "pid": "1000" + }, + "kubeReserved": { + "cpu": "200m", + "memory": "6666Mi", + "pid": "1000" + } + } + }`, + ) ) -func mockPodsList(w http.ResponseWriter, r *http.Request) { +func validateAuth(r *http.Request) bool { bear := r.Header.Get("Authorization") if bear == "" { - w.WriteHeader(http.StatusUnauthorized) - return + return false } parts := strings.Split(bear, "Bearer") if len(parts) != 2 { - w.WriteHeader(http.StatusUnauthorized) - return + return false } - http_token := strings.TrimSpace(parts[1]) - if len(http_token) < 1 { - w.WriteHeader(http.StatusUnauthorized) - return + httpToken := strings.TrimSpace(parts[1]) + if len(httpToken) < 1 { + return false + } + if httpToken != token { + return false } - if http_token != token { + return true +} + +func mockPodsList(w http.ResponseWriter, r *http.Request) { + if !validateAuth(r) { w.WriteHeader(http.StatusUnauthorized) return } @@ -70,6 +99,16 @@ func mockPodsList(w http.ResponseWriter, r *http.Request) { w.Write(b) } +func mockGetKubeletConfiguration(w http.ResponseWriter, r *http.Request) { + if !validateAuth(r) { + w.WriteHeader(http.StatusUnauthorized) + return + } + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write(kubeletConfigzData) +} + func parseHostAndPort(rawURL string) (string, string, error) { u, err := url.Parse(rawURL) if err != nil { @@ -111,6 +150,56 @@ func Test_kubeletStub_GetAllPods(t *testing.T) { }) } +func Test_kubeletStub_GetKubeletConfiguration(t *testing.T) { + token = "token" + + server := httptest.NewTLSServer(http.HandlerFunc(mockGetKubeletConfiguration)) + defer server.Close() + + address, portStr, err := parseHostAndPort(server.URL) + if err != nil { + t.Fatal(err) + } + + port, _ := strconv.Atoi(portStr) + cfg := &rest.Config{ + Host: net.JoinHostPort(address, portStr), + BearerToken: token, + TLSClientConfig: rest.TLSClientConfig{ + Insecure: true, + }, + } + + client, err := NewKubeletStub(address, port, "https", 10*time.Second, cfg) + if err != nil { + t.Fatal(err) + } + kubeletConfiguration, err := client.GetKubeletConfiguration() + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "static", kubeletConfiguration.CPUManagerPolicy) + expectedEvictionHard := map[string]string{ + "imagefs.available": "15%", + "memory.available": "222Mi", + "nodefs.available": "10%", + "nodefs.inodesFree": "5%", + } + expectedSystemReserved := map[string]string{ + "cpu": "200m", + "memory": "1111Mi", + "pid": "1000", + } + expectedKubeReserved := map[string]string{ + "cpu": "200m", + "memory": "6666Mi", + "pid": "1000", + } + assert.Equal(t, expectedEvictionHard, kubeletConfiguration.EvictionHard) + assert.Equal(t, expectedSystemReserved, kubeletConfiguration.SystemReserved) + assert.Equal(t, expectedKubeReserved, kubeletConfiguration.KubeReserved) +} + func TestNewKubeletStub(t *testing.T) { type args struct { addr string diff --git a/pkg/koordlet/statesinformer/states_noderesourcetopology.go b/pkg/koordlet/statesinformer/states_noderesourcetopology.go index 5218a6764..f0a90e842 100644 --- a/pkg/koordlet/statesinformer/states_noderesourcetopology.go +++ b/pkg/koordlet/statesinformer/states_noderesourcetopology.go @@ -43,17 +43,12 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" "github.com/koordinator-sh/koordinator/pkg/util" kubeletutil "github.com/koordinator-sh/koordinator/pkg/util/kubelet" - "github.com/koordinator-sh/koordinator/pkg/util/system" ) const ( nodeTopoInformerName pluginName = "nodeTopoInformer" ) -var ( - getKubeletCommandlineFn = system.GetKubeletCommandline -) - type nodeTopoInformer struct { config *Config topologyClient topologyclientset.Interface @@ -63,6 +58,7 @@ type nodeTopoInformer struct { metricCache metriccache.MetricCache callbackRunner *callbackRunner + kubelet KubeletStub nodeInformer *nodeInformer podsInformer *podsInformer } @@ -103,6 +99,14 @@ func (s *nodeTopoInformer) Start(stopCh <-chan struct{}) { if !cache.WaitForCacheSync(stopCh, s.nodeInformer.HasSynced, s.podsInformer.HasSynced) { klog.Fatalf("timed out waiting for pod caches to sync") } + if s.config.NodeTopologySyncInterval <= 0 { + return + } + stub, err := newKubeletStubFromConfig(s.nodeInformer.GetNode(), s.config) + if err != nil { + klog.Fatalf("create kubelet stub, %v", err) + } + s.kubelet = stub go wait.Until(s.reportNodeTopology, s.config.NodeTopologySyncInterval, stopCh) klog.V(2).Infof("node topo informer started") } @@ -139,28 +143,21 @@ func (s *nodeTopoInformer) calcNodeTopo() (map[string]string, error) { return nil, fmt.Errorf("failed to calculate cpu topology, err: %v", err) } - kubeletPort := int(s.nodeInformer.GetNode().Status.DaemonEndpoints.KubeletEndpoint.Port) - args, err := getKubeletCommandlineFn(kubeletPort) + kubeletConfiguration, err := s.kubelet.GetKubeletConfiguration() if err != nil { - return nil, fmt.Errorf("failed to get Kubelet commandline with kubeletPort %d, err: %v", kubeletPort, err) - } - - klog.V(5).Infof("kubelet args: %v", args) - - kubeletOptions, err := kubeletutil.NewKubeletOptions(args) - if err != nil { - return nil, fmt.Errorf("failed to NewKubeletOptions, err: %v", err) + return nil, fmt.Errorf("failed to GetKubeletConfiguration, err: %v", err) } + klog.V(5).Infof("kubelet args: %v", kubeletConfiguration) // default policy is none cpuManagerPolicy := extension.KubeletCPUManagerPolicy{ - Policy: kubeletOptions.CPUManagerPolicy, - Options: kubeletOptions.CPUManagerPolicyOptions, + Policy: kubeletConfiguration.CPUManagerPolicy, + Options: kubeletConfiguration.CPUManagerPolicyOptions, } - if kubeletOptions.CPUManagerPolicy == string(cpumanager.PolicyStatic) { + if kubeletConfiguration.CPUManagerPolicy == string(cpumanager.PolicyStatic) { topology := kubeletutil.NewCPUTopology((*util.LocalCPUInfo)(nodeCPUInfo)) - reservedCPUs, err := kubeletutil.GetStaticCPUManagerPolicyReservedCPUs(topology, kubeletOptions) + reservedCPUs, err := kubeletutil.GetStaticCPUManagerPolicyReservedCPUs(topology, kubeletConfiguration) if err != nil { klog.Errorf("Failed to GetStaticCPUManagerPolicyReservedCPUs, err: %v", err) } @@ -176,8 +173,9 @@ func (s *nodeTopoInformer) calcNodeTopo() (map[string]string, error) { return nil, fmt.Errorf("failed to marshal cpu manager policy, err: %v", err) } - var podAllocsJSON []byte - stateFilePath := kubeletutil.GetCPUManagerStateFilePath(kubeletOptions.RootDirectory) + // Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet, + // but inside koordlet it is always mounted to the path /var/lib/kubelet + stateFilePath := kubeletutil.GetCPUManagerStateFilePath("/var/lib/kubelet") data, err := os.ReadFile(stateFilePath) if err != nil { if !os.IsNotExist(err) { @@ -185,6 +183,7 @@ func (s *nodeTopoInformer) calcNodeTopo() (map[string]string, error) { } } // TODO: report lse/lsr pod from cgroup + var podAllocsJSON []byte if len(data) > 0 { podAllocs, err := s.calGuaranteedCpu(sharedPoolCPUs, string(data)) if err != nil { diff --git a/pkg/koordlet/statesinformer/states_noderesourcetopology_test.go b/pkg/koordlet/statesinformer/states_noderesourcetopology_test.go index 9e39a3c56..57a9adcc4 100644 --- a/pkg/koordlet/statesinformer/states_noderesourcetopology_test.go +++ b/pkg/koordlet/statesinformer/states_noderesourcetopology_test.go @@ -19,8 +19,6 @@ package statesinformer import ( "context" "encoding/json" - "fmt" - "os" "testing" "github.com/golang/mock/gomock" @@ -30,7 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/features" @@ -249,11 +247,6 @@ func Test_reportNodeTopology(t *testing.T) { assert.NoError(t, err) }() - oldFn := getKubeletCommandlineFn - defer func() { - getKubeletCommandlineFn = oldFn - }() - client := topologyclientsetfake.NewSimpleClientset() testNode := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -322,12 +315,14 @@ func Test_reportNodeTopology(t *testing.T) { node: testNode, }, callbackRunner: NewCallbackRunner(), - } - - getKubeletCommandlineFn = func(port int) ([]string, error) { - tempDir := os.TempDir() - args := []string{"/usr/bin/kubelet", fmt.Sprintf("--root-dir=%s", tempDir), "--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf", "--kubeconfig=/etc/kubernetes/kubelet.conf", "--container-log-max-files", "10", "--container-log-max-size=100Mi", "--max-pods", "213", "--pod-max-pids", "16384", "--pod-manifest-path=/etc/kubernetes/manifests", "--network-plugin=cni", "--cni-conf-dir=/etc/cni/net.d", "--cni-bin-dir=/opt/cni/bin", "--v=3", "--enable-controller-attach-detach=true", "--cluster-dns=192.168.0.10", "--pod-infra-container-image=registry-vpc.cn-hangzhou.aliyuncs.com/acs/pause:3.5", "--enable-load-reader", "--cluster-domain=cluster.local", "--cloud-provider=external", "--hostname-override=cn-hangzhou.10.0.4.18", "--provider-id=cn-hangzhou.i-bp1049apy5ggvw0qbuh6", "--authorization-mode=Webhook", "--authentication-token-webhook=true", "--anonymous-auth=false", "--client-ca-file=/etc/kubernetes/pki/ca.crt", "--cgroup-driver=systemd", "--tls-cipher-suites=TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_128_GCM_SHA256", "--tls-cert-file=/var/lib/kubelet/pki/kubelet.crt", "--tls-private-key-file=/var/lib/kubelet/pki/kubelet.key", "--rotate-certificates=true", "--cert-dir=/var/lib/kubelet/pki", "--node-labels=alibabacloud.com/nodepool-id=npab2a7b3f6ce84f5aacc55b08df6b8ecd,ack.aliyun.com=c5558876cbc06429782797388d4abe3e0", "--eviction-hard=imagefs.available<15%,memory.available<300Mi,nodefs.available<10%,nodefs.inodesFree<5%", "--system-reserved=cpu=200m,memory=2732Mi", "--kube-reserved=cpu=1800m,memory=2732Mi", "--kube-reserved=pid=1000", "--system-reserved=pid=1000", "--cpu-manager-policy=static", "--container-runtime=remote", "--container-runtime-endpoint=/var/run/containerd/containerd.sock"} - return args, nil + kubelet: &testKubeletStub{ + config: &kubeletconfiginternal.KubeletConfiguration{ + CPUManagerPolicy: "static", + KubeReserved: map[string]string{ + "cpu": "2000m", + }, + }, + }, } // reporting enabled @@ -337,7 +332,7 @@ func Test_reportNodeTopology(t *testing.T) { assert.Equal(t, nil, err) expectKubeletCPUManagerPolicy := extension.KubeletCPUManagerPolicy{ - Policy: string(cpumanager.PolicyStatic), + Policy: "static", ReservedCPUs: "0-1", } var kubeletCPUManagerPolicy extension.KubeletCPUManagerPolicy diff --git a/pkg/koordlet/statesinformer/states_pods_test.go b/pkg/koordlet/statesinformer/states_pods_test.go index a10056534..14ececf60 100644 --- a/pkg/koordlet/statesinformer/states_pods_test.go +++ b/pkg/koordlet/statesinformer/states_pods_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" "github.com/koordinator-sh/koordinator/pkg/util/system" ) @@ -90,13 +91,18 @@ func Test_genPodCgroupParentDirWithCgroupfsDriver(t *testing.T) { } type testKubeletStub struct { - pods corev1.PodList + pods corev1.PodList + config *kubeletconfiginternal.KubeletConfiguration } func (t *testKubeletStub) GetAllPods() (corev1.PodList, error) { return t.pods, nil } +func (t *testKubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error) { + return t.config, nil +} + type testErrorKubeletStub struct { } @@ -104,6 +110,10 @@ func (t *testErrorKubeletStub) GetAllPods() (corev1.PodList, error) { return corev1.PodList{}, errors.New("test error") } +func (t *testErrorKubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error) { + return nil, errors.New("test error") +} + func Test_statesInformer_syncPods(t *testing.T) { stopCh := make(chan struct{}, 1) defer close(stopCh) diff --git a/pkg/util/kubelet/kubelet.go b/pkg/util/kubelet/kubelet.go index 081a0d69e..3b68d12f2 100644 --- a/pkg/util/kubelet/kubelet.go +++ b/pkg/util/kubelet/kubelet.go @@ -19,144 +19,24 @@ package kubelet import ( "fmt" - "io" "math" "path/filepath" "strconv" - "github.com/spf13/pflag" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - cliflag "k8s.io/component-base/cli/flag" "k8s.io/klog/v2" - "k8s.io/kubernetes/cmd/kubelet/app/options" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/eviction" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" - "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles" "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit" - utilfs "k8s.io/kubernetes/pkg/util/filesystem" "github.com/koordinator-sh/koordinator/pkg/util" ) -type KubeletOptions struct { - *options.KubeletFlags - *kubeletconfiginternal.KubeletConfiguration -} - -func NewKubeletOptions(args []string) (*KubeletOptions, error) { - kubeletConfig, err := options.NewKubeletConfiguration() - if err != nil { - return nil, err - } - - cleanFlagSet := pflag.NewFlagSet("kubelet", pflag.ContinueOnError) - cleanFlagSet.ParseErrorsWhitelist.UnknownFlags = true - cleanFlagSet.SetOutput(io.Discard) - cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) - kubeletFlags := options.NewKubeletFlags() - kubeletFlags.AddFlags(cleanFlagSet) - options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig) - - err = cleanFlagSet.Parse(args) - if err != nil { - return nil, err - } - - // NOTE: options.ValidateKubeletFlags is not compatible with kubelet v1.18, now skip validation - // validate the initial KubeletFlags - // if err := options.ValidateKubeletFlags(kubeletFlags); err != nil { - // return nil, fmt.Errorf("failed to validate kubelet flags: %w", err) - // } - - // load kubelet config file, if provided - if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 { - kubeletConfig, err = loadConfigFile(configFile) - if err != nil { - return nil, fmt.Errorf("failed to load kubelet config file, error: %w, path: %s", err, configFile) - } - // We must enforce flag precedence by re-parsing the command line into the new object. - // This is necessary to preserve backwards-compatibility across binary upgrades. - // See issue #56171 for more details. - if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil { - return nil, fmt.Errorf("failed to precedence kubeletConfigFlag: %w", err) - } - } - - return &KubeletOptions{ - KubeletFlags: kubeletFlags, - KubeletConfiguration: kubeletConfig, - }, nil -} - -// newFlagSetWithGlobals constructs a new pflag.FlagSet with global flags registered -// on it. -func newFlagSetWithGlobals() *pflag.FlagSet { - fs := pflag.NewFlagSet("", pflag.ExitOnError) - // set the normalize func, similar to k8s.io/component-base/cli//flags.go:InitFlags - fs.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) - // explicitly add flags from libs that register global flags - options.AddGlobalFlags(fs) - return fs -} - -// newFakeFlagSet constructs a pflag.FlagSet with the same flags as fs, but where -// all values have noop Set implementations -func newFakeFlagSet(fs *pflag.FlagSet) *pflag.FlagSet { - ret := pflag.NewFlagSet("", pflag.ExitOnError) - ret.SetNormalizeFunc(fs.GetNormalizeFunc()) - fs.VisitAll(func(f *pflag.Flag) { - ret.VarP(cliflag.NoOp{}, f.Name, f.Shorthand, f.Usage) - }) - return ret -} - -// kubeletConfigFlagPrecedence re-parses flags over the KubeletConfiguration object. -// We must enforce flag precedence by re-parsing the command line into the new object. -// This is necessary to preserve backwards-compatibility across binary upgrades. -// See issue #56171 for more details. -func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration, args []string) error { - // We use a throwaway kubeletFlags and a fake global flagset to avoid double-parses, - // as some Set implementations accumulate values from multiple flag invocations. - fs := newFakeFlagSet(newFlagSetWithGlobals()) - fs.ParseErrorsWhitelist.UnknownFlags = true - fs.SetOutput(io.Discard) - // register throwaway KubeletFlags - options.NewKubeletFlags().AddFlags(fs) - // register new KubeletConfiguration - options.AddKubeletConfigFlags(fs, kc) - // Remember original feature gates, so we can merge with flag gates later - original := kc.FeatureGates - // re-parse flags - if err := fs.Parse(args); err != nil { - return err - } - // Add back feature gates that were set in the original kc, but not in flags - for k, v := range original { - if _, ok := kc.FeatureGates[k]; !ok { - kc.FeatureGates[k] = v - } - } - return nil -} - -func loadConfigFile(kubeletConfigFile string) (*kubeletconfiginternal.KubeletConfiguration, error) { - const errFmt = "failed to load Kubelet config file %s, error %v" - loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile) - if err != nil { - return nil, fmt.Errorf(errFmt, kubeletConfigFile, err) - } - kc, err := loader.Load() - if err != nil { - return nil, fmt.Errorf(errFmt, kubeletConfigFile, err) - } - return kc, err -} - func NewCPUTopology(cpuInfo *util.LocalCPUInfo) *topology.CPUTopology { cpuTopology := &topology.CPUTopology{ NumCPUs: int(cpuInfo.TotalInfo.NumberCPUs), @@ -174,18 +54,18 @@ func NewCPUTopology(cpuInfo *util.LocalCPUInfo) *topology.CPUTopology { return cpuTopology } -func GetStaticCPUManagerPolicyReservedCPUs(topology *topology.CPUTopology, kubeletOptions *KubeletOptions) (cpuset.CPUSet, error) { - if kubeletOptions.CPUManagerPolicy != string(cpumanager.PolicyStatic) { +func GetStaticCPUManagerPolicyReservedCPUs(topology *topology.CPUTopology, kubeletConfiguration *kubeletconfiginternal.KubeletConfiguration) (cpuset.CPUSet, error) { + if kubeletConfiguration.CPUManagerPolicy != string(cpumanager.PolicyStatic) { return cpuset.CPUSet{}, nil } - reservedCPUs, kubeReserved, systemReserved, err := GetKubeletReservedOptions(kubeletOptions, topology) + reservedCPUs, kubeReserved, systemReserved, err := GetKubeletReservedOptions(kubeletConfiguration, topology) if err != nil { return cpuset.CPUSet{}, err } nodeAllocatableReservation, err := GetNodeAllocatableReservation(topology.NumCPUs, 0, - kubeletOptions.EvictionHard, systemReserved, kubeReserved, kubeletOptions.ExperimentalNodeAllocatableIgnoreEvictionThreshold) + kubeletConfiguration.EvictionHard, systemReserved, kubeReserved, false) if err != nil { return cpuset.CPUSet{}, err } @@ -238,29 +118,29 @@ func getNumReservedCPUs(nodeAllocatableReservation corev1.ResourceList) (int, er return numReservedCPUs, nil } -func GetKubeletReservedOptions(kubeletOptions *KubeletOptions, topology *topology.CPUTopology) (reservedSystemCPUs cpuset.CPUSet, kubeReserved, systemReserved corev1.ResourceList, err error) { - reservedSystemCPUs, err = getReservedCPUs(topology, kubeletOptions.ReservedSystemCPUs) +func GetKubeletReservedOptions(kubeletConfiguration *kubeletconfiginternal.KubeletConfiguration, topology *topology.CPUTopology) (reservedSystemCPUs cpuset.CPUSet, kubeReserved, systemReserved corev1.ResourceList, err error) { + reservedSystemCPUs, err = getReservedCPUs(topology, kubeletConfiguration.ReservedSystemCPUs) if err != nil { return } if reservedSystemCPUs.Size() > 0 { // at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok - klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", kubeletOptions.KubeReserved, "systemReservedCPUs", kubeletOptions.SystemReserved) - if kubeletOptions.KubeReserved != nil { - delete(kubeletOptions.KubeReserved, "cpu") + klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", kubeletConfiguration.KubeReserved, "systemReservedCPUs", kubeletConfiguration.SystemReserved) + if kubeletConfiguration.KubeReserved != nil { + delete(kubeletConfiguration.KubeReserved, "cpu") } - if kubeletOptions.SystemReserved == nil { - kubeletOptions.SystemReserved = make(map[string]string) + if kubeletConfiguration.SystemReserved == nil { + kubeletConfiguration.SystemReserved = make(map[string]string) } - kubeletOptions.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size()) - klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", kubeletOptions.KubeReserved, "systemReservedCPUs", kubeletOptions.SystemReserved) + kubeletConfiguration.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size()) + klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", kubeletConfiguration.KubeReserved, "systemReservedCPUs", kubeletConfiguration.SystemReserved) } - kubeReserved, err = parseResourceList(kubeletOptions.KubeReserved) + kubeReserved, err = parseResourceList(kubeletConfiguration.KubeReserved) if err != nil { return } - systemReserved, err = parseResourceList(kubeletOptions.SystemReserved) + systemReserved, err = parseResourceList(kubeletConfiguration.SystemReserved) if err != nil { return } diff --git a/pkg/util/kubelet/kubelet_test.go b/pkg/util/kubelet/kubelet_test.go index 6c965ea73..c35c6b8bf 100644 --- a/pkg/util/kubelet/kubelet_test.go +++ b/pkg/util/kubelet/kubelet_test.go @@ -17,103 +17,56 @@ limitations under the License. package kubelet import ( - "fmt" - "os" "testing" "github.com/stretchr/testify/assert" - "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" -) - -var ( - kubeletArgsWithoutCPUManagerPolicy = []string{"/usr/bin/kubelet", "--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf", "--kubeconfig=/etc/kubernetes/kubelet.conf", "--container-log-max-files", "10", "--container-log-max-size=100Mi", "--max-pods", "213", "--pod-max-pids", "16384", "--pod-manifest-path=/etc/kubernetes/manifests", "--network-plugin=cni", "--cni-conf-dir=/etc/cni/net.d", "--cni-bin-dir=/opt/cni/bin", "--v=3", "--enable-controller-attach-detach=true", "--cluster-dns=192.168.0.10", "--pod-infra-container-image=registry-vpc.cn-hangzhou.aliyuncs.com/acs/pause:3.5", "--enable-load-reader", "--cluster-domain=cluster.local", "--cloud-provider=external", "--hostname-override=cn-hangzhou.10.0.4.18", "--provider-id=cn-hangzhou.i-bp1049apy5ggvw0qbuh6", "--authorization-mode=Webhook", "--authentication-token-webhook=true", "--anonymous-auth=false", "--client-ca-file=/etc/kubernetes/pki/ca.crt", "--cgroup-driver=systemd", "--tls-cipher-suites=TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_128_GCM_SHA256", "--tls-cert-file=/var/lib/kubelet/pki/kubelet.crt", "--tls-private-key-file=/var/lib/kubelet/pki/kubelet.key", "--rotate-certificates=true", "--cert-dir=/var/lib/kubelet/pki", "--node-labels=alibabacloud.com/nodepool-id=npab2a7b3f6ce84f5aacc55b08df6b8ecd,ack.aliyun.com=c5558876cbc06429782797388d4abe3e0", "--eviction-hard=imagefs.available<15%,memory.available<300Mi,nodefs.available<10%,nodefs.inodesFree<5%", "--system-reserved=cpu=200m,memory=2732Mi", "--kube-reserved=cpu=1800m,memory=2732Mi", "--kube-reserved=pid=1000", "--system-reserved=pid=1000", "--container-runtime=remote", "--container-runtime-endpoint=/var/run/containerd/containerd.sock"} - kubeletArgsWithNoneCPUManagerPolicy = []string{"/usr/bin/kubelet", "--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf", "--kubeconfig=/etc/kubernetes/kubelet.conf", "--container-log-max-files", "10", "--container-log-max-size=100Mi", "--max-pods", "213", "--pod-max-pids", "16384", "--pod-manifest-path=/etc/kubernetes/manifests", "--network-plugin=cni", "--cni-conf-dir=/etc/cni/net.d", "--cni-bin-dir=/opt/cni/bin", "--v=3", "--enable-controller-attach-detach=true", "--cluster-dns=192.168.0.10", "--pod-infra-container-image=registry-vpc.cn-hangzhou.aliyuncs.com/acs/pause:3.5", "--enable-load-reader", "--cluster-domain=cluster.local", "--cloud-provider=external", "--hostname-override=cn-hangzhou.10.0.4.18", "--provider-id=cn-hangzhou.i-bp1049apy5ggvw0qbuh6", "--authorization-mode=Webhook", "--authentication-token-webhook=true", "--anonymous-auth=false", "--client-ca-file=/etc/kubernetes/pki/ca.crt", "--cgroup-driver=systemd", "--tls-cipher-suites=TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_128_GCM_SHA256", "--tls-cert-file=/var/lib/kubelet/pki/kubelet.crt", "--tls-private-key-file=/var/lib/kubelet/pki/kubelet.key", "--rotate-certificates=true", "--cert-dir=/var/lib/kubelet/pki", "--node-labels=alibabacloud.com/nodepool-id=npab2a7b3f6ce84f5aacc55b08df6b8ecd,ack.aliyun.com=c5558876cbc06429782797388d4abe3e0", "--eviction-hard=imagefs.available<15%,memory.available<300Mi,nodefs.available<10%,nodefs.inodesFree<5%", "--system-reserved=cpu=200m,memory=2732Mi", "--kube-reserved=cpu=1800m,memory=2732Mi", "--kube-reserved=pid=1000", "--system-reserved=pid=1000", "--cpu-manager-policy=none", "--container-runtime=remote", "--container-runtime-endpoint=/var/run/containerd/containerd.sock"} - kubeletArgsWithStaticCPUManagerPolicy = []string{"/usr/bin/kubelet", "--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf", "--kubeconfig=/etc/kubernetes/kubelet.conf", "--container-log-max-files", "10", "--container-log-max-size=100Mi", "--max-pods", "213", "--pod-max-pids", "16384", "--pod-manifest-path=/etc/kubernetes/manifests", "--network-plugin=cni", "--cni-conf-dir=/etc/cni/net.d", "--cni-bin-dir=/opt/cni/bin", "--v=3", "--enable-controller-attach-detach=true", "--cluster-dns=192.168.0.10", "--pod-infra-container-image=registry-vpc.cn-hangzhou.aliyuncs.com/acs/pause:3.5", "--enable-load-reader", "--cluster-domain=cluster.local", "--cloud-provider=external", "--hostname-override=cn-hangzhou.10.0.4.18", "--provider-id=cn-hangzhou.i-bp1049apy5ggvw0qbuh6", "--authorization-mode=Webhook", "--authentication-token-webhook=true", "--anonymous-auth=false", "--client-ca-file=/etc/kubernetes/pki/ca.crt", "--cgroup-driver=systemd", "--tls-cipher-suites=TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_128_GCM_SHA256", "--tls-cert-file=/var/lib/kubelet/pki/kubelet.crt", "--tls-private-key-file=/var/lib/kubelet/pki/kubelet.key", "--rotate-certificates=true", "--cert-dir=/var/lib/kubelet/pki", "--node-labels=alibabacloud.com/nodepool-id=npab2a7b3f6ce84f5aacc55b08df6b8ecd,ack.aliyun.com=c5558876cbc06429782797388d4abe3e0", "--eviction-hard=imagefs.available<15%,memory.available<300Mi,nodefs.available<10%,nodefs.inodesFree<5%", "--system-reserved=cpu=200m,memory=2732Mi", "--kube-reserved=cpu=1800m,memory=2732Mi", "--kube-reserved=pid=1000", "--system-reserved=pid=1000", "--cpu-manager-policy=static", "--container-runtime=remote", "--container-runtime-endpoint=/var/run/containerd/containerd.sock"} - kubeletArgsWithStaticCPUManagerPolicyAndReservedCPUs = []string{"/usr/bin/kubelet", "--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf", "--kubeconfig=/etc/kubernetes/kubelet.conf", "--container-log-max-files", "10", "--container-log-max-size=100Mi", "--max-pods", "213", "--pod-max-pids", "16384", "--pod-manifest-path=/etc/kubernetes/manifests", "--network-plugin=cni", "--cni-conf-dir=/etc/cni/net.d", "--cni-bin-dir=/opt/cni/bin", "--v=3", "--enable-controller-attach-detach=true", "--cluster-dns=192.168.0.10", "--pod-infra-container-image=registry-vpc.cn-hangzhou.aliyuncs.com/acs/pause:3.5", "--enable-load-reader", "--cluster-domain=cluster.local", "--cloud-provider=external", "--hostname-override=cn-hangzhou.10.0.4.18", "--provider-id=cn-hangzhou.i-bp1049apy5ggvw0qbuh6", "--authorization-mode=Webhook", "--authentication-token-webhook=true", "--anonymous-auth=false", "--client-ca-file=/etc/kubernetes/pki/ca.crt", "--cgroup-driver=systemd", "--tls-cipher-suites=TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_128_GCM_SHA256", "--tls-cert-file=/var/lib/kubelet/pki/kubelet.crt", "--tls-private-key-file=/var/lib/kubelet/pki/kubelet.key", "--rotate-certificates=true", "--cert-dir=/var/lib/kubelet/pki", "--node-labels=alibabacloud.com/nodepool-id=npab2a7b3f6ce84f5aacc55b08df6b8ecd,ack.aliyun.com=c5558876cbc06429782797388d4abe3e0", "--eviction-hard=imagefs.available<15%,memory.available<300Mi,nodefs.available<10%,nodefs.inodesFree<5%", "--system-reserved=cpu=200m,memory=2732Mi", "--kube-reserved=cpu=1800m,memory=2732Mi", "--kube-reserved=pid=1000", "--system-reserved=pid=1000", "--cpu-manager-policy=static", "--container-runtime=remote", "--container-runtime-endpoint=/var/run/containerd/containerd.sock", "--reserved-cpus=1,2,3,4"} + kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" ) func TestGetStaticCPUManagerPolicyReservedCPUs(t *testing.T) { tests := []struct { name string - args []string + config *kubeletconfiginternal.KubeletConfiguration wantReservedCPUs string }{ { - name: "none policy", - args: kubeletArgsWithNoneCPUManagerPolicy, + name: "none policy", + config: &kubeletconfiginternal.KubeletConfiguration{ + CPUManagerPolicy: "none", + }, wantReservedCPUs: "", }, { - name: "static policy", - args: kubeletArgsWithStaticCPUManagerPolicy, + name: "static policy", + config: &kubeletconfiginternal.KubeletConfiguration{ + CPUManagerPolicy: "static", + EvictionHard: map[string]string{ + "imagefs.available": "15%", "memory.available": "300Mi", + }, + SystemReserved: map[string]string{ + "cpu": "200m", "memory": "2732Mi", + }, + KubeReserved: map[string]string{ + "cpu": "1800m", "memory": "2732Mi", + }, + }, wantReservedCPUs: "0,6", }, { - name: "static policy with specified reserved cpus", - args: kubeletArgsWithStaticCPUManagerPolicyAndReservedCPUs, + name: "static policy with specified reserved cpus", + config: &kubeletconfiginternal.KubeletConfiguration{ + CPUManagerPolicy: "static", + ReservedSystemCPUs: "1-4", + }, wantReservedCPUs: "1-4", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - options, err := NewKubeletOptions(tt.args) - assert.NoError(t, err) - assert.NotNil(t, options) - - reservedCPUS, err := GetStaticCPUManagerPolicyReservedCPUs(topoDualSocketHT, options) + reservedCPUS, err := GetStaticCPUManagerPolicyReservedCPUs(topoDualSocketHT, tt.config) assert.NoError(t, err) assert.Equal(t, tt.wantReservedCPUs, reservedCPUS.String()) }) } } - -func TestKubeletConfigWithFile(t *testing.T) { - staticPolicyFile := `kind: KubeletConfiguration -apiVersion: kubelet.config.k8s.io/v1beta1 -cpuManagerPolicy: static` - tests := []struct { - name string - args []string - configContent string - wantPolicy string - }{ - { - name: "use file config", - args: kubeletArgsWithoutCPUManagerPolicy, - configContent: staticPolicyFile, - wantPolicy: string(cpumanager.PolicyStatic), - }, - { - name: "override config file by flags", - args: kubeletArgsWithNoneCPUManagerPolicy, - configContent: staticPolicyFile, - wantPolicy: string(cpumanager.PolicyNone), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tempFile, err := os.CreateTemp("", "koordlet-ut-") - assert.NoError(t, err) - assert.NotNil(t, tempFile) - fileName := tempFile.Name() - defer func() { - tempFile.Close() - os.Remove(fileName) - }() - - _, err = tempFile.WriteString(tt.configContent) - assert.NoError(t, err) - - args := make([]string, len(tt.args)) - copy(args, tt.args) - args = append(args, fmt.Sprintf("--config=%s", fileName)) - - options, err := NewKubeletOptions(args) - assert.NoError(t, err) - assert.NotNil(t, options) - assert.Equal(t, tt.wantPolicy, options.CPUManagerPolicy) - }) - } -}