Skip to content

Commit

Permalink
koordlet: query kubelet config via HTTP API (#776)
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <[email protected]>
  • Loading branch information
eahydra authored Nov 2, 2022
1 parent bf60609 commit 2d5506c
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 255 deletions.
50 changes: 50 additions & 0 deletions pkg/koordlet/statesinformer/kubelet_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/cmd/kubelet/app/options"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
)

type KubeletStub interface {
GetAllPods() (corev1.PodList, error)
GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error)
}

type kubeletStub struct {
Expand Down Expand Up @@ -89,3 +94,48 @@ func (k *kubeletStub) GetAllPods() (corev1.PodList, error) {
}
return podList, nil
}

type kubeletConfigz struct {
ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
}

func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error) {
configzURL := url.URL{
Scheme: k.scheme,
Host: net.JoinHostPort(k.addr, strconv.Itoa(k.port)),
Path: "/configz",
}
rsp, err := k.httpClient.Get(configzURL.String())
if err != nil {
return nil, err
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request %s failed, code %d", configzURL.String(), rsp.StatusCode)
}

body, err := io.ReadAll(rsp.Body)
if err != nil {
return nil, err
}

var configz kubeletConfigz
if err = json.Unmarshal(body, &configz); err != nil {
return nil, fmt.Errorf("failed to unmarshal kubeletConfigz: %v", err)
}

kubeletConfiguration, err := options.NewKubeletConfiguration()
if err != nil {
return nil, err
}

scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
if err = scheme.Convert(&configz.ComponentConfig, kubeletConfiguration, nil); err != nil {
return nil, err
}
return kubeletConfiguration, nil
}
111 changes: 100 additions & 11 deletions pkg/koordlet/statesinformer/kubelet_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,56 @@ import (
)

var (
token string
token string
kubeletConfigzData = []byte(`
{
"kubeletconfig": {
"enableServer": true,
"cpuManagerPolicy": "static",
"cpuManagerReconcilePeriod": "10s",
"evictionHard": {
"imagefs.available": "15%",
"memory.available": "222Mi",
"nodefs.available": "10%",
"nodefs.inodesFree": "5%"
},
"systemReserved": {
"cpu": "200m",
"memory": "1111Mi",
"pid": "1000"
},
"kubeReserved": {
"cpu": "200m",
"memory": "6666Mi",
"pid": "1000"
}
}
}`,
)
)

func mockPodsList(w http.ResponseWriter, r *http.Request) {
func validateAuth(r *http.Request) bool {
bear := r.Header.Get("Authorization")
if bear == "" {
w.WriteHeader(http.StatusUnauthorized)
return
return false
}
parts := strings.Split(bear, "Bearer")
if len(parts) != 2 {
w.WriteHeader(http.StatusUnauthorized)
return
return false
}

http_token := strings.TrimSpace(parts[1])
if len(http_token) < 1 {
w.WriteHeader(http.StatusUnauthorized)
return
httpToken := strings.TrimSpace(parts[1])
if len(httpToken) < 1 {
return false
}
if httpToken != token {
return false
}
if http_token != token {
return true
}

func mockPodsList(w http.ResponseWriter, r *http.Request) {
if !validateAuth(r) {
w.WriteHeader(http.StatusUnauthorized)
return
}
Expand All @@ -70,6 +99,16 @@ func mockPodsList(w http.ResponseWriter, r *http.Request) {
w.Write(b)
}

func mockGetKubeletConfiguration(w http.ResponseWriter, r *http.Request) {
if !validateAuth(r) {
w.WriteHeader(http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write(kubeletConfigzData)
}

func parseHostAndPort(rawURL string) (string, string, error) {
u, err := url.Parse(rawURL)
if err != nil {
Expand Down Expand Up @@ -111,6 +150,56 @@ func Test_kubeletStub_GetAllPods(t *testing.T) {
})
}

func Test_kubeletStub_GetKubeletConfiguration(t *testing.T) {
token = "token"

server := httptest.NewTLSServer(http.HandlerFunc(mockGetKubeletConfiguration))
defer server.Close()

address, portStr, err := parseHostAndPort(server.URL)
if err != nil {
t.Fatal(err)
}

port, _ := strconv.Atoi(portStr)
cfg := &rest.Config{
Host: net.JoinHostPort(address, portStr),
BearerToken: token,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}

client, err := NewKubeletStub(address, port, "https", 10*time.Second, cfg)
if err != nil {
t.Fatal(err)
}
kubeletConfiguration, err := client.GetKubeletConfiguration()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "static", kubeletConfiguration.CPUManagerPolicy)
expectedEvictionHard := map[string]string{
"imagefs.available": "15%",
"memory.available": "222Mi",
"nodefs.available": "10%",
"nodefs.inodesFree": "5%",
}
expectedSystemReserved := map[string]string{
"cpu": "200m",
"memory": "1111Mi",
"pid": "1000",
}
expectedKubeReserved := map[string]string{
"cpu": "200m",
"memory": "6666Mi",
"pid": "1000",
}
assert.Equal(t, expectedEvictionHard, kubeletConfiguration.EvictionHard)
assert.Equal(t, expectedSystemReserved, kubeletConfiguration.SystemReserved)
assert.Equal(t, expectedKubeReserved, kubeletConfiguration.KubeReserved)
}

func TestNewKubeletStub(t *testing.T) {
type args struct {
addr string
Expand Down
41 changes: 20 additions & 21 deletions pkg/koordlet/statesinformer/states_noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,12 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/util"
kubeletutil "github.com/koordinator-sh/koordinator/pkg/util/kubelet"
"github.com/koordinator-sh/koordinator/pkg/util/system"
)

const (
nodeTopoInformerName pluginName = "nodeTopoInformer"
)

var (
getKubeletCommandlineFn = system.GetKubeletCommandline
)

type nodeTopoInformer struct {
config *Config
topologyClient topologyclientset.Interface
Expand All @@ -63,6 +58,7 @@ type nodeTopoInformer struct {
metricCache metriccache.MetricCache
callbackRunner *callbackRunner

kubelet KubeletStub
nodeInformer *nodeInformer
podsInformer *podsInformer
}
Expand Down Expand Up @@ -103,6 +99,14 @@ func (s *nodeTopoInformer) Start(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, s.nodeInformer.HasSynced, s.podsInformer.HasSynced) {
klog.Fatalf("timed out waiting for pod caches to sync")
}
if s.config.NodeTopologySyncInterval <= 0 {
return
}
stub, err := newKubeletStubFromConfig(s.nodeInformer.GetNode(), s.config)
if err != nil {
klog.Fatalf("create kubelet stub, %v", err)
}
s.kubelet = stub
go wait.Until(s.reportNodeTopology, s.config.NodeTopologySyncInterval, stopCh)
klog.V(2).Infof("node topo informer started")
}
Expand Down Expand Up @@ -139,28 +143,21 @@ func (s *nodeTopoInformer) calcNodeTopo() (map[string]string, error) {
return nil, fmt.Errorf("failed to calculate cpu topology, err: %v", err)
}

kubeletPort := int(s.nodeInformer.GetNode().Status.DaemonEndpoints.KubeletEndpoint.Port)
args, err := getKubeletCommandlineFn(kubeletPort)
kubeletConfiguration, err := s.kubelet.GetKubeletConfiguration()
if err != nil {
return nil, fmt.Errorf("failed to get Kubelet commandline with kubeletPort %d, err: %v", kubeletPort, err)
}

klog.V(5).Infof("kubelet args: %v", args)

kubeletOptions, err := kubeletutil.NewKubeletOptions(args)
if err != nil {
return nil, fmt.Errorf("failed to NewKubeletOptions, err: %v", err)
return nil, fmt.Errorf("failed to GetKubeletConfiguration, err: %v", err)
}
klog.V(5).Infof("kubelet args: %v", kubeletConfiguration)

// default policy is none
cpuManagerPolicy := extension.KubeletCPUManagerPolicy{
Policy: kubeletOptions.CPUManagerPolicy,
Options: kubeletOptions.CPUManagerPolicyOptions,
Policy: kubeletConfiguration.CPUManagerPolicy,
Options: kubeletConfiguration.CPUManagerPolicyOptions,
}

if kubeletOptions.CPUManagerPolicy == string(cpumanager.PolicyStatic) {
if kubeletConfiguration.CPUManagerPolicy == string(cpumanager.PolicyStatic) {
topology := kubeletutil.NewCPUTopology((*util.LocalCPUInfo)(nodeCPUInfo))
reservedCPUs, err := kubeletutil.GetStaticCPUManagerPolicyReservedCPUs(topology, kubeletOptions)
reservedCPUs, err := kubeletutil.GetStaticCPUManagerPolicyReservedCPUs(topology, kubeletConfiguration)
if err != nil {
klog.Errorf("Failed to GetStaticCPUManagerPolicyReservedCPUs, err: %v", err)
}
Expand All @@ -176,15 +173,17 @@ func (s *nodeTopoInformer) calcNodeTopo() (map[string]string, error) {
return nil, fmt.Errorf("failed to marshal cpu manager policy, err: %v", err)
}

var podAllocsJSON []byte
stateFilePath := kubeletutil.GetCPUManagerStateFilePath(kubeletOptions.RootDirectory)
// Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet,
// but inside koordlet it is always mounted to the path /var/lib/kubelet
stateFilePath := kubeletutil.GetCPUManagerStateFilePath("/var/lib/kubelet")
data, err := os.ReadFile(stateFilePath)
if err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to read state file, err: %v", err)
}
}
// TODO: report lse/lsr pod from cgroup
var podAllocsJSON []byte
if len(data) > 0 {
podAllocs, err := s.calGuaranteedCpu(sharedPoolCPUs, string(data))
if err != nil {
Expand Down
25 changes: 10 additions & 15 deletions pkg/koordlet/statesinformer/states_noderesourcetopology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package statesinformer
import (
"context"
"encoding/json"
"fmt"
"os"
"testing"

"github.com/golang/mock/gomock"
Expand All @@ -30,7 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/features"
Expand Down Expand Up @@ -249,11 +247,6 @@ func Test_reportNodeTopology(t *testing.T) {
assert.NoError(t, err)
}()

oldFn := getKubeletCommandlineFn
defer func() {
getKubeletCommandlineFn = oldFn
}()

client := topologyclientsetfake.NewSimpleClientset()
testNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -322,12 +315,14 @@ func Test_reportNodeTopology(t *testing.T) {
node: testNode,
},
callbackRunner: NewCallbackRunner(),
}

getKubeletCommandlineFn = func(port int) ([]string, error) {
tempDir := os.TempDir()
args := []string{"/usr/bin/kubelet", fmt.Sprintf("--root-dir=%s", tempDir), "--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf", "--kubeconfig=/etc/kubernetes/kubelet.conf", "--container-log-max-files", "10", "--container-log-max-size=100Mi", "--max-pods", "213", "--pod-max-pids", "16384", "--pod-manifest-path=/etc/kubernetes/manifests", "--network-plugin=cni", "--cni-conf-dir=/etc/cni/net.d", "--cni-bin-dir=/opt/cni/bin", "--v=3", "--enable-controller-attach-detach=true", "--cluster-dns=192.168.0.10", "--pod-infra-container-image=registry-vpc.cn-hangzhou.aliyuncs.com/acs/pause:3.5", "--enable-load-reader", "--cluster-domain=cluster.local", "--cloud-provider=external", "--hostname-override=cn-hangzhou.10.0.4.18", "--provider-id=cn-hangzhou.i-bp1049apy5ggvw0qbuh6", "--authorization-mode=Webhook", "--authentication-token-webhook=true", "--anonymous-auth=false", "--client-ca-file=/etc/kubernetes/pki/ca.crt", "--cgroup-driver=systemd", "--tls-cipher-suites=TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_128_GCM_SHA256", "--tls-cert-file=/var/lib/kubelet/pki/kubelet.crt", "--tls-private-key-file=/var/lib/kubelet/pki/kubelet.key", "--rotate-certificates=true", "--cert-dir=/var/lib/kubelet/pki", "--node-labels=alibabacloud.com/nodepool-id=npab2a7b3f6ce84f5aacc55b08df6b8ecd,ack.aliyun.com=c5558876cbc06429782797388d4abe3e0", "--eviction-hard=imagefs.available<15%,memory.available<300Mi,nodefs.available<10%,nodefs.inodesFree<5%", "--system-reserved=cpu=200m,memory=2732Mi", "--kube-reserved=cpu=1800m,memory=2732Mi", "--kube-reserved=pid=1000", "--system-reserved=pid=1000", "--cpu-manager-policy=static", "--container-runtime=remote", "--container-runtime-endpoint=/var/run/containerd/containerd.sock"}
return args, nil
kubelet: &testKubeletStub{
config: &kubeletconfiginternal.KubeletConfiguration{
CPUManagerPolicy: "static",
KubeReserved: map[string]string{
"cpu": "2000m",
},
},
},
}

// reporting enabled
Expand All @@ -337,7 +332,7 @@ func Test_reportNodeTopology(t *testing.T) {
assert.Equal(t, nil, err)

expectKubeletCPUManagerPolicy := extension.KubeletCPUManagerPolicy{
Policy: string(cpumanager.PolicyStatic),
Policy: "static",
ReservedCPUs: "0-1",
}
var kubeletCPUManagerPolicy extension.KubeletCPUManagerPolicy
Expand Down
Loading

0 comments on commit 2d5506c

Please sign in to comment.