From e6d24984d7da9bce9f53624a5b07032c10eb07d6 Mon Sep 17 00:00:00 2001 From: LambdaHJ <huangjin_henan@outlook.com> Date: Wed, 8 Jun 2022 16:27:55 +0800 Subject: [PATCH] feat: add kubelet http2 support (#180) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 黄金 <heikkihuang@kugou.net> Co-authored-by: 黄金 <heikkihuang@kugou.net> --- pkg/koordlet/statesinformer/config.go | 27 +-- pkg/koordlet/statesinformer/config_test.go | 100 ++++++++++ pkg/koordlet/statesinformer/kubelet_stub.go | 66 +++++-- .../statesinformer/kubelet_stub_test.go | 172 ++++++++++++++++++ .../statesinformer/states_informer.go | 50 ++++- .../statesinformer/states_informer_test.go | 136 +++++++++++++- pkg/koordlet/statesinformer/states_pod.go | 12 +- pkg/util/http.go | 55 ------ pkg/util/node.go | 22 +++ pkg/util/node_test.go | 100 +++++++++- 10 files changed, 645 insertions(+), 95 deletions(-) create mode 100644 pkg/koordlet/statesinformer/config_test.go create mode 100644 pkg/koordlet/statesinformer/kubelet_stub_test.go delete mode 100644 pkg/util/http.go diff --git a/pkg/koordlet/statesinformer/config.go b/pkg/koordlet/statesinformer/config.go index 24ab8e173..2a6c593dd 100644 --- a/pkg/koordlet/statesinformer/config.go +++ b/pkg/koordlet/statesinformer/config.go @@ -16,27 +16,28 @@ limitations under the License. package statesinformer -import "flag" +import ( + "flag" + + corev1 "k8s.io/api/core/v1" +) type Config struct { - KubeletIPAddr string - KubeletHTTPPort int - KubeletSyncIntervalSeconds int - KubeletSyncTimeoutSeconds int + KubeletPreferredAddressType string + KubeletSyncIntervalSeconds int + KubeletSyncTimeoutSeconds int } func NewDefaultConfig() *Config { return &Config{ - KubeletIPAddr: "localhost", - KubeletHTTPPort: 10255, - KubeletSyncIntervalSeconds: 1, - KubeletSyncTimeoutSeconds: 3, + KubeletPreferredAddressType: string(corev1.NodeInternalIP), + KubeletSyncIntervalSeconds: 30, + KubeletSyncTimeoutSeconds: 3, } } func (c *Config) InitFlags(fs *flag.FlagSet) { - fs.StringVar(&c.KubeletIPAddr, "KubeletIPAddr", c.KubeletIPAddr, "Kubelet IP Address.") - fs.IntVar(&c.KubeletHTTPPort, "KubeletHTTPPort", c.KubeletHTTPPort, "Kubelet HTTP httpPort.") - fs.IntVar(&c.KubeletSyncIntervalSeconds, "KubeletSyncIntervalSeconds", c.KubeletSyncIntervalSeconds, "Kubelet sync interval by seconds.") - fs.IntVar(&c.KubeletSyncTimeoutSeconds, "KubeletSyncTimeoutSeconds", c.KubeletSyncTimeoutSeconds, "Kubelet sync timeout by seconds.") + fs.StringVar(&c.KubeletPreferredAddressType, "KubeletPreferredAddressType", c.KubeletPreferredAddressType, "The node address types to use when determining which address to use to connect to a particular node.") + fs.IntVar(&c.KubeletSyncIntervalSeconds, "KubeletSyncIntervalSeconds", c.KubeletSyncIntervalSeconds, "The interval at which Koordlet will retain datas from Kubelet.") + fs.IntVar(&c.KubeletSyncTimeoutSeconds, "KubeletSyncTimeoutSeconds", c.KubeletSyncTimeoutSeconds, "The length of time to wait before giving up on a single request to Kubelet.") } diff --git a/pkg/koordlet/statesinformer/config_test.go b/pkg/koordlet/statesinformer/config_test.go new file mode 100644 index 000000000..95e8792d6 --- /dev/null +++ b/pkg/koordlet/statesinformer/config_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statesinformer + +import ( + "flag" + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" +) + +func TestNewDefaultConfig(t *testing.T) { + tests := []struct { + name string + want *Config + }{ + { + name: "config", + want: &Config{ + KubeletPreferredAddressType: string(corev1.NodeInternalIP), + KubeletSyncIntervalSeconds: 30, + KubeletSyncTimeoutSeconds: 3, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewDefaultConfig(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewDefaultConfig() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestConfig_InitFlags(t *testing.T) { + cmdArgs := []string{ + "", + "--KubeletPreferredAddressType=Hostname", + "--KubeletSyncIntervalSeconds=10", + "--KubeletSyncTimeoutSeconds=30", + } + fs := flag.NewFlagSet(cmdArgs[0], flag.ExitOnError) + + type fields struct { + KubeletPreferredAddressType string + KubeletSyncIntervalSeconds int + KubeletSyncTimeoutSeconds int + ApiServerSyncTimeoutSeconds int + } + type args struct { + fs *flag.FlagSet + } + tests := []struct { + name string + fields fields + args args + }{ + { + name: "not default", + fields: fields{ + KubeletPreferredAddressType: "Hostname", + KubeletSyncIntervalSeconds: 10, + KubeletSyncTimeoutSeconds: 30, + ApiServerSyncTimeoutSeconds: 20, + }, + args: args{fs: fs}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + raw := &Config{ + KubeletPreferredAddressType: tt.fields.KubeletPreferredAddressType, + KubeletSyncIntervalSeconds: tt.fields.KubeletSyncIntervalSeconds, + KubeletSyncTimeoutSeconds: tt.fields.KubeletSyncTimeoutSeconds, + } + c := NewDefaultConfig() + c.InitFlags(tt.args.fs) + tt.args.fs.Parse(cmdArgs[1:]) + if !reflect.DeepEqual(raw, c) { + t.Fatalf("InitFlags got: %+v, want: %+v", c, raw) + } + }) + } +} diff --git a/pkg/koordlet/statesinformer/kubelet_stub.go b/pkg/koordlet/statesinformer/kubelet_stub.go index de6d4553f..0c70d39e2 100644 --- a/pkg/koordlet/statesinformer/kubelet_stub.go +++ b/pkg/koordlet/statesinformer/kubelet_stub.go @@ -19,10 +19,13 @@ package statesinformer import ( "encoding/json" "fmt" + "io/ioutil" + "net/http" + "time" corev1 "k8s.io/api/core/v1" - - "github.com/koordinator-sh/koordinator/pkg/util" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/client-go/transport" ) type KubeletStub interface { @@ -30,27 +33,68 @@ type KubeletStub interface { } type kubeletStub struct { - ipAddr string - httpPort int - timeoutSeconds int + addr string + port int + httpClient *http.Client } -func NewKubeletStub(ip string, port, timeoutSeconds int) KubeletStub { +func NewKubeletStub(addr string, port, timeoutSeconds int, token string) (KubeletStub, error) { + preTlsConfig := makeTransportConfig(token, true) + tlsConfig, err := transport.TLSConfigFor(preTlsConfig) + if err != nil { + return nil, err + } + rt := http.DefaultTransport + if tlsConfig != nil { + // If SSH Tunnel is turned on + rt = utilnet.SetOldTransportDefaults(&http.Transport{ + TLSClientConfig: tlsConfig, + }) + } + roundTripper, err := transport.HTTPWrappersForConfig(makeTransportConfig(token, true), rt) + if err != nil { + return nil, err + } + client := &http.Client{ + Timeout: time.Duration(timeoutSeconds) * time.Second, + Transport: roundTripper, + } return &kubeletStub{ - ipAddr: ip, - httpPort: port, - timeoutSeconds: timeoutSeconds, + httpClient: client, + addr: addr, + port: port, + }, nil +} + +func makeTransportConfig(token string, insecure bool) *transport.Config { + tlsConfig := &transport.Config{ + BearerToken: token, + TLS: transport.TLSConfig{ + Insecure: true, + }, } + return tlsConfig } func (k *kubeletStub) GetAllPods() (corev1.PodList, error) { podList := corev1.PodList{} - result, err := util.DoHTTPGet("pods", k.ipAddr, k.httpPort, k.timeoutSeconds) + url := fmt.Sprintf("https://%v:%d/pods/", k.addr, k.port) + rsp, err := k.httpClient.Get(url) if err != nil { return podList, err } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + return podList, fmt.Errorf("request %s failed, code %d", url, rsp.StatusCode) + } + + body, err := ioutil.ReadAll(rsp.Body) + if err != nil { + return podList, err + } + // parse json data - err = json.Unmarshal(result, &podList) + err = json.Unmarshal(body, &podList) if err != nil { return podList, fmt.Errorf("parse kubelet pod list failed, err: %v", err) } diff --git a/pkg/koordlet/statesinformer/kubelet_stub_test.go b/pkg/koordlet/statesinformer/kubelet_stub_test.go new file mode 100644 index 000000000..91f57ee8e --- /dev/null +++ b/pkg/koordlet/statesinformer/kubelet_stub_test.go @@ -0,0 +1,172 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statesinformer + +import ( + "encoding/json" + "flag" + "log" + "net" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/transport" +) + +var ( + token string +) + +func mockPodsList(w http.ResponseWriter, r *http.Request) { + bear := r.Header.Get("Authorization") + if bear == "" { + w.WriteHeader(http.StatusUnauthorized) + return + } + parts := strings.Split(bear, "Bearer") + if len(parts) != 2 { + w.WriteHeader(http.StatusUnauthorized) + return + } + + http_token := strings.TrimSpace(parts[1]) + if len(http_token) < 1 { + w.WriteHeader(http.StatusUnauthorized) + return + } + if http_token != token { + w.WriteHeader(http.StatusUnauthorized) + return + } + podList := new(corev1.PodList) + b, err := json.Marshal(podList) + if err != nil { + log.Printf("codec error %+v", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write(b) +} + +func parseHostAndPort(rawURL string) (string, string, error) { + u, err := url.Parse(rawURL) + if err != nil { + return "", "0", err + } + return net.SplitHostPort(u.Host) +} + +func Test_kubeletStub_GetAllPods(t *testing.T) { + flag.StringVar(&token, "token", "mockTest", "") + flag.Parse() + + server := httptest.NewTLSServer(http.HandlerFunc(mockPodsList)) + defer server.Close() + + address, portStr, err := parseHostAndPort(server.URL) + if err != nil { + t.Fatal(err) + } + port, _ := strconv.Atoi(portStr) + + client, err := NewKubeletStub(address, port, 10, token) + if err != nil { + t.Fatal(err) + } + ps, err := client.GetAllPods() + if err != nil { + t.Fatal(err) + } + t.Logf("podList %+v\n", ps) +} + +func TestNewKubeletStub(t *testing.T) { + type args struct { + addr string + port int + timeoutSeconds int + token string + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "127.0.0.1", + args: args{ + addr: "127.0.0.1", + port: 10250, + timeoutSeconds: 10, + token: "test_token", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewKubeletStub(tt.args.addr, tt.args.port, tt.args.timeoutSeconds, tt.args.token) + if (err != nil) != tt.wantErr { + t.Errorf("NewKubeletStub() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.NotNil(t, got) + }) + } +} + +func Test_makeTransportConfig(t *testing.T) { + inToken := "test_token" + ts := &transport.Config{ + BearerToken: inToken, + TLS: transport.TLSConfig{ + Insecure: true, + }, + } + type args struct { + token string + insecure bool + } + tests := []struct { + name string + args args + want *transport.Config + }{ + { + name: "transport", + args: args{ + token: inToken, + }, + want: ts, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := makeTransportConfig(tt.args.token, tt.args.insecure); !reflect.DeepEqual(got, tt.want) { + t.Errorf("makeTransportConfig() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/koordlet/statesinformer/states_informer.go b/pkg/koordlet/statesinformer/states_informer.go index 46c3b6ff9..ed135f38c 100644 --- a/pkg/koordlet/statesinformer/states_informer.go +++ b/pkg/koordlet/statesinformer/states_informer.go @@ -19,6 +19,7 @@ package statesinformer import ( "context" "fmt" + "io/ioutil" "reflect" "sync" "time" @@ -36,6 +37,11 @@ import ( slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" "github.com/koordinator-sh/koordinator/pkg/koordlet/pleg" + "github.com/koordinator-sh/koordinator/pkg/util" +) + +const ( + tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" ) type StatesInformer interface { @@ -80,7 +86,6 @@ func NewStatesInformer(config *Config, kubeClient clientset.Interface, crdClient return &statesInformer{ config: config, - kubelet: NewKubeletStub(config.KubeletIPAddr, config.KubeletHTTPPort, config.KubeletSyncTimeoutSeconds), podHasSynced: atomic.NewBool(false), pleg: pleg, @@ -108,6 +113,20 @@ func (s *statesInformer) Run(stopCh <-chan struct{}) error { go s.nodeInformer.Run(stopCh) go s.nodeSLOInformer.Run(stopCh) + // waiting for node synced. + waitInformersSynced := []cache.InformerSynced{ + s.nodeInformer.HasSynced, s.nodeSLOInformer.HasSynced} + if !cache.WaitForCacheSync(stopCh, waitInformersSynced...) { + return fmt.Errorf("timed out waiting for states informer caches to sync") + } + + stub, err := newKubeletStub(s.GetNode(), s.config.KubeletPreferredAddressType, s.config.KubeletSyncTimeoutSeconds, tokenPath) + if err != nil { + klog.ErrorS(err, "create kubelet stub") + return err + } + s.kubelet = stub + if s.config.KubeletSyncIntervalSeconds > 0 { hdlID := s.pleg.AddHandler(pleg.PodLifeCycleHandlerFuncs{ PodAddedFunc: func(podID string) { @@ -125,10 +144,10 @@ func (s *statesInformer) Run(stopCh <-chan struct{}) error { s.config.KubeletSyncIntervalSeconds) } - waitInformersSynced := []cache.InformerSynced{ - s.nodeInformer.HasSynced, s.nodeSLOInformer.HasSynced, s.podHasSynced.Load} - if !cache.WaitForCacheSync(stopCh, waitInformersSynced...) { - return fmt.Errorf("timed out waiting for states informer caches to sync") + // waiting for pods synced. + waitPodSynced := []cache.InformerSynced{s.podHasSynced.Load} + if !cache.WaitForCacheSync(stopCh, waitPodSynced...) { + return fmt.Errorf("timed out waiting for pod caches to sync") } go s.startCallbackRunners(stopCh) @@ -209,3 +228,24 @@ func (s *statesInformer) setupInformers() { s.setupNodeInformer() s.setupNodeSLOInformer() } + +func newKubeletStub(node *corev1.Node, addressPreferred string, timeout int, tokenPath string) (KubeletStub, error) { + var address string + var err error + addressPreferredType := corev1.NodeAddressType(addressPreferred) + // if the address of the specified type has not been set or error type, InternalIP will be used. + if !util.IsNodeAddressTypeSupported(addressPreferredType) { + klog.Warningf("Wrong address type or empty type, InternalIP will be used, error: (%+v).", addressPreferredType) + addressPreferredType = corev1.NodeInternalIP + } + address, err = util.GetNodeAddress(node, addressPreferredType) + if err != nil { + klog.Fatalf("Get node address error: %v type(%s) ", err, addressPreferred) + } + token, err := ioutil.ReadFile(tokenPath) + if err != nil { + return nil, err + } + kubeletEndpointPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port + return NewKubeletStub(address, int(kubeletEndpointPort), timeout, string(token)) +} diff --git a/pkg/koordlet/statesinformer/states_informer_test.go b/pkg/koordlet/statesinformer/states_informer_test.go index f79e2798b..5c58d3b1a 100644 --- a/pkg/koordlet/statesinformer/states_informer_test.go +++ b/pkg/koordlet/statesinformer/states_informer_test.go @@ -17,8 +17,12 @@ limitations under the License. package statesinformer import ( + "errors" + "io/ioutil" + "os" "path/filepath" "testing" + "time" koordclientfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" @@ -162,25 +166,149 @@ func (t *testKubeletStub) GetAllPods() (corev1.PodList, error) { return t.pods, nil } +type testErrorKubeletStub struct { +} + +func (t *testErrorKubeletStub) GetAllPods() (corev1.PodList, error) { + return corev1.PodList{}, errors.New("test error") +} + func Test_statesInformer_syncPods(t *testing.T) { client := clientsetfake.NewSimpleClientset() crdClient := koordclientfake.NewSimpleClientset() pleg, _ := pleg.NewPLEG(system.Conf.CgroupRootDir) stopCh := make(chan struct{}, 1) defer close(stopCh) - + testingNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{}, + }, + } c := NewDefaultConfig() c.KubeletSyncIntervalSeconds = 60 m := NewStatesInformer(c, client, crdClient, pleg, "localhost") + m.(*statesInformer).node = testingNode m.(*statesInformer).kubelet = &testKubeletStub{pods: corev1.PodList{ Items: []corev1.Pod{ {}, }, }} - m.(*statesInformer).syncKubelet() - + m.(*statesInformer).syncPods() if len(m.(*statesInformer).GetAllPods()) != 1 { - t.Errorf("failed to update pods") + t.Fatal("failed to update pods") + } + + m.(*statesInformer).kubelet = &testErrorKubeletStub{} + + err := m.(*statesInformer).syncPods() + if err == nil { + t.Fatalf("need not nil error, but get error %+v", err) + } +} + +func Test_newKubeletStub(t *testing.T) { + testingNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{}, + }, + Status: corev1.NodeStatus{ + DaemonEndpoints: corev1.NodeDaemonEndpoints{ + KubeletEndpoint: corev1.DaemonEndpoint{ + Port: 10250, + }, + }, + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: "127.0.0.1"}, + }, + }, + } + tokenContent := "test_token" + f, err := ioutil.TempFile("", "token") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + f.WriteString(tokenContent) + kubeStub, _ := NewKubeletStub("127.0.0.7", 10250, 10, tokenContent) + type args struct { + node *corev1.Node + addressPreferred string + timeout int + tokenPath string + } + tests := []struct { + name string + args args + want KubeletStub + wantErr bool + }{ + { + name: "NodeInternalIP", + args: args{ + node: testingNode, + addressPreferred: string(corev1.NodeInternalIP), + timeout: 10, + tokenPath: f.Name(), + }, + want: kubeStub, + wantErr: false, + }, + { + name: "Empty IP", + args: args{ + node: testingNode, + addressPreferred: "", + timeout: 10, + tokenPath: f.Name(), + }, + want: kubeStub, + wantErr: false, + }, + { + name: "Error Path", + args: args{ + node: testingNode, + addressPreferred: "", + timeout: 10, + tokenPath: "", + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := newKubeletStub(tt.args.node, tt.args.addressPreferred, tt.args.timeout, tt.args.tokenPath) + if (err != nil) != tt.wantErr { + t.Errorf("newKubeletStub() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantErr && got != nil { + t.Errorf("newKubeletStub() = %v, want %v", got, tt.want) + } + }) } } + +func Test_statesInformer_syncKubeletLoop(t *testing.T) { + client := clientsetfake.NewSimpleClientset() + crdClient := koordclientfake.NewSimpleClientset() + pleg, _ := pleg.NewPLEG(system.Conf.CgroupRootDir) + stopCh := make(chan struct{}, 1) + + c := NewDefaultConfig() + c.KubeletSyncIntervalSeconds = 3 + + m := NewStatesInformer(c, client, crdClient, pleg, "localhost") + m.(*statesInformer).kubelet = &testKubeletStub{pods: corev1.PodList{ + Items: []corev1.Pod{ + {}, + }, + }} + go m.(*statesInformer).syncKubeletLoop(time.Second*time.Duration(c.KubeletSyncIntervalSeconds), stopCh) + time.Sleep(5 * time.Second) + close(stopCh) +} diff --git a/pkg/koordlet/statesinformer/states_pod.go b/pkg/koordlet/statesinformer/states_pod.go index e50e755ab..4141a45eb 100644 --- a/pkg/koordlet/statesinformer/states_pod.go +++ b/pkg/koordlet/statesinformer/states_pod.go @@ -26,9 +26,11 @@ import ( "github.com/koordinator-sh/koordinator/pkg/util" ) -func (s *statesInformer) syncKubelet() error { +func (s *statesInformer) syncPods() error { podList, err := s.kubelet.GetAllPods() - if err != nil { + + // when kubelet recovers from crash, podList may be empty. + if err != nil || len(podList.Items) == 0 { klog.Warningf("get pods from kubelet failed, err: %v", err) return err } @@ -42,7 +44,7 @@ func (s *statesInformer) syncKubelet() error { s.podMap = newPodMap s.podHasSynced.Store(true) s.podUpdatedTime = time.Now() - klog.Infof("get pods from kubelet success, len %d", len(s.podMap)) + klog.Infof("get pods success, len %d", len(s.podMap)) return nil } @@ -56,7 +58,7 @@ func (s *statesInformer) syncKubeletLoop(duration time.Duration, stopCh <-chan s case <-s.podCreated: if rateLimiter.Allow() { // sync kubelet triggered immediately when the Pod is created - s.syncKubelet() + s.syncPods() // reset timer to if !timer.Stop() { <-timer.C @@ -65,7 +67,7 @@ func (s *statesInformer) syncKubeletLoop(duration time.Duration, stopCh <-chan s } case <-timer.C: timer.Reset(duration) - s.syncKubelet() + s.syncPods() case <-stopCh: klog.Infof("sync kubelet loop is exited") return diff --git a/pkg/util/http.go b/pkg/util/http.go deleted file mode 100644 index 539aa95d6..000000000 --- a/pkg/util/http.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2022 The Koordinator Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "fmt" - "io/ioutil" - "net/http" - "time" -) - -func DoHTTPGet(methodName string, ip string, port int, timeout int) ([]byte, error) { - httpClient := &http.Client{ - Timeout: time.Duration(timeout) * time.Second, - } - - result := []byte{} - - httpURL := fmt.Sprintf("http://%s:%d/%s", ip, port, methodName) - request, err := http.NewRequest(http.MethodGet, httpURL, nil) - if err != nil { - return result, fmt.Errorf("failed to create http request, url: %v, err: %v", httpURL, err) - } - - res, err := httpClient.Do(request) - if err != nil { - return result, fmt.Errorf("failed to connect url: %v, err: %v", httpURL, err) - } - defer res.Body.Close() - - result, err = ioutil.ReadAll(res.Body) - if err != nil { - return result, fmt.Errorf("failed to read from response, url: %v, http code: %v, err: %v", - httpURL, res.StatusCode, err) - } - - if res.StatusCode != http.StatusOK { - return result, fmt.Errorf("http response statue code %v, body %v", res.StatusCode, string(result)) - } - return result, nil -} diff --git a/pkg/util/node.go b/pkg/util/node.go index 76feee4e0..8d3c11676 100644 --- a/pkg/util/node.go +++ b/pkg/util/node.go @@ -75,3 +75,25 @@ func GetRootCgroupCurCFSPeriod(qosClass corev1.PodQOSClass) (int64, error) { } return strconv.ParseInt(strings.TrimSpace(rawContent), 10, 64) } + +// GetNodeAddress get node specified type address. +func GetNodeAddress(node *corev1.Node, addrType corev1.NodeAddressType) (string, error) { + for _, address := range node.Status.Addresses { + if address.Type == addrType { + return address.Address, nil + } + } + return "", fmt.Errorf("no address matched types %v", addrType) +} + +// IsNodeAddressTypeSupported determine whether addrType is a supported type. +func IsNodeAddressTypeSupported(addrType corev1.NodeAddressType) bool { + if addrType == corev1.NodeHostName || + addrType == corev1.NodeExternalIP || + addrType == corev1.NodeExternalDNS || + addrType == corev1.NodeInternalIP || + addrType == corev1.NodeInternalDNS { + return true + } + return false +} diff --git a/pkg/util/node_test.go b/pkg/util/node_test.go index 609b8c8a8..5fba7598f 100644 --- a/pkg/util/node_test.go +++ b/pkg/util/node_test.go @@ -20,10 +20,9 @@ import ( "path" "testing" + "github.com/koordinator-sh/koordinator/pkg/util/system" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - - "github.com/koordinator-sh/koordinator/pkg/util/system" ) func Test_GetKubeQosRelativePath(t *testing.T) { @@ -47,3 +46,100 @@ func Test_GetKubeQosRelativePath(t *testing.T) { besteffortPathCgroupfs := GetKubeQosRelativePath(corev1.PodQOSBestEffort) assert.Equal(t, path.Join(system.KubeRootNameCgroupfs, system.KubeBesteffortNameCgroupfs), besteffortPathCgroupfs) } + +func TestGetNodeAddress(t *testing.T) { + type args struct { + node *corev1.Node + addrType corev1.NodeAddressType + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "InternalIP", + args: args{ + node: &corev1.Node{ + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: "192.168.1.1"}, + }, + }, + }, + addrType: corev1.NodeInternalIP, + }, + want: "192.168.1.1", + wantErr: false, + }, + { + name: "Hostname", + args: args{ + node: &corev1.Node{ + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeHostName, Address: "node1"}, + }, + }, + }, + addrType: corev1.NodeHostName, + }, + want: "node1", + wantErr: false, + }, + { + name: "Empty", + args: args{ + node: &corev1.Node{ + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: "192.168.1.1"}, + {Type: corev1.NodeHostName, Address: "node1"}, + }, + }, + }, + addrType: corev1.NodeExternalDNS, + }, + want: "", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetNodeAddress(tt.args.node, tt.args.addrType) + if (err != nil) != tt.wantErr { + t.Errorf("GetNodeAddress() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("GetNodeAddress() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestIsNodeAddressTypeSupported(t *testing.T) { + type args struct { + addrType corev1.NodeAddressType + } + tests := []struct { + name string + args args + want bool + }{ + {name: "Hostname", args: args{addrType: corev1.NodeHostName}, want: true}, + {name: "InternalIP", args: args{addrType: corev1.NodeInternalIP}, want: true}, + {name: "InternalDNS", args: args{addrType: corev1.NodeInternalDNS}, want: true}, + {name: "ExternalIP", args: args{addrType: corev1.NodeExternalIP}, want: true}, + {name: "ExternalDNS", args: args{addrType: corev1.NodeExternalDNS}, want: true}, + {name: "EmptyAddress", args: args{addrType: corev1.NodeAddressType("EmptyAddress")}, want: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsNodeAddressTypeSupported(tt.args.addrType); got != tt.want { + t.Errorf("IsAddressTypeSupported() = %v, want %v", got, tt.want) + } + }) + } +}