Skip to content

Commit

Permalink
feat: add kubelet http2 support (#180)
Browse files Browse the repository at this point in the history
Signed-off-by: 黄金 <[email protected]>

Co-authored-by: 黄金 <[email protected]>
  • Loading branch information
LambdaHJ and 黄金 authored Jun 8, 2022
1 parent fc8db7c commit e6d2498
Show file tree
Hide file tree
Showing 10 changed files with 645 additions and 95 deletions.
27 changes: 14 additions & 13 deletions pkg/koordlet/statesinformer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,28 @@ limitations under the License.

package statesinformer

import "flag"
import (
"flag"

corev1 "k8s.io/api/core/v1"
)

type Config struct {
KubeletIPAddr string
KubeletHTTPPort int
KubeletSyncIntervalSeconds int
KubeletSyncTimeoutSeconds int
KubeletPreferredAddressType string
KubeletSyncIntervalSeconds int
KubeletSyncTimeoutSeconds int
}

func NewDefaultConfig() *Config {
return &Config{
KubeletIPAddr: "localhost",
KubeletHTTPPort: 10255,
KubeletSyncIntervalSeconds: 1,
KubeletSyncTimeoutSeconds: 3,
KubeletPreferredAddressType: string(corev1.NodeInternalIP),
KubeletSyncIntervalSeconds: 30,
KubeletSyncTimeoutSeconds: 3,
}
}

func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.KubeletIPAddr, "KubeletIPAddr", c.KubeletIPAddr, "Kubelet IP Address.")
fs.IntVar(&c.KubeletHTTPPort, "KubeletHTTPPort", c.KubeletHTTPPort, "Kubelet HTTP httpPort.")
fs.IntVar(&c.KubeletSyncIntervalSeconds, "KubeletSyncIntervalSeconds", c.KubeletSyncIntervalSeconds, "Kubelet sync interval by seconds.")
fs.IntVar(&c.KubeletSyncTimeoutSeconds, "KubeletSyncTimeoutSeconds", c.KubeletSyncTimeoutSeconds, "Kubelet sync timeout by seconds.")
fs.StringVar(&c.KubeletPreferredAddressType, "KubeletPreferredAddressType", c.KubeletPreferredAddressType, "The node address types to use when determining which address to use to connect to a particular node.")
fs.IntVar(&c.KubeletSyncIntervalSeconds, "KubeletSyncIntervalSeconds", c.KubeletSyncIntervalSeconds, "The interval at which Koordlet will retain datas from Kubelet.")
fs.IntVar(&c.KubeletSyncTimeoutSeconds, "KubeletSyncTimeoutSeconds", c.KubeletSyncTimeoutSeconds, "The length of time to wait before giving up on a single request to Kubelet.")
}
100 changes: 100 additions & 0 deletions pkg/koordlet/statesinformer/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package statesinformer

import (
"flag"
"reflect"
"testing"

corev1 "k8s.io/api/core/v1"
)

func TestNewDefaultConfig(t *testing.T) {
tests := []struct {
name string
want *Config
}{
{
name: "config",
want: &Config{
KubeletPreferredAddressType: string(corev1.NodeInternalIP),
KubeletSyncIntervalSeconds: 30,
KubeletSyncTimeoutSeconds: 3,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewDefaultConfig(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewDefaultConfig() = %v, want %v", got, tt.want)
}
})
}
}

func TestConfig_InitFlags(t *testing.T) {
cmdArgs := []string{
"",
"--KubeletPreferredAddressType=Hostname",
"--KubeletSyncIntervalSeconds=10",
"--KubeletSyncTimeoutSeconds=30",
}
fs := flag.NewFlagSet(cmdArgs[0], flag.ExitOnError)

type fields struct {
KubeletPreferredAddressType string
KubeletSyncIntervalSeconds int
KubeletSyncTimeoutSeconds int
ApiServerSyncTimeoutSeconds int
}
type args struct {
fs *flag.FlagSet
}
tests := []struct {
name string
fields fields
args args
}{
{
name: "not default",
fields: fields{
KubeletPreferredAddressType: "Hostname",
KubeletSyncIntervalSeconds: 10,
KubeletSyncTimeoutSeconds: 30,
ApiServerSyncTimeoutSeconds: 20,
},
args: args{fs: fs},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

raw := &Config{
KubeletPreferredAddressType: tt.fields.KubeletPreferredAddressType,
KubeletSyncIntervalSeconds: tt.fields.KubeletSyncIntervalSeconds,
KubeletSyncTimeoutSeconds: tt.fields.KubeletSyncTimeoutSeconds,
}
c := NewDefaultConfig()
c.InitFlags(tt.args.fs)
tt.args.fs.Parse(cmdArgs[1:])
if !reflect.DeepEqual(raw, c) {
t.Fatalf("InitFlags got: %+v, want: %+v", c, raw)
}
})
}
}
66 changes: 55 additions & 11 deletions pkg/koordlet/statesinformer/kubelet_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,82 @@ package statesinformer
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

corev1 "k8s.io/api/core/v1"

"github.com/koordinator-sh/koordinator/pkg/util"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/transport"
)

type KubeletStub interface {
GetAllPods() (corev1.PodList, error)
}

type kubeletStub struct {
ipAddr string
httpPort int
timeoutSeconds int
addr string
port int
httpClient *http.Client
}

func NewKubeletStub(ip string, port, timeoutSeconds int) KubeletStub {
func NewKubeletStub(addr string, port, timeoutSeconds int, token string) (KubeletStub, error) {
preTlsConfig := makeTransportConfig(token, true)
tlsConfig, err := transport.TLSConfigFor(preTlsConfig)
if err != nil {
return nil, err
}
rt := http.DefaultTransport
if tlsConfig != nil {
// If SSH Tunnel is turned on
rt = utilnet.SetOldTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
})
}
roundTripper, err := transport.HTTPWrappersForConfig(makeTransportConfig(token, true), rt)
if err != nil {
return nil, err
}
client := &http.Client{
Timeout: time.Duration(timeoutSeconds) * time.Second,
Transport: roundTripper,
}
return &kubeletStub{
ipAddr: ip,
httpPort: port,
timeoutSeconds: timeoutSeconds,
httpClient: client,
addr: addr,
port: port,
}, nil
}

func makeTransportConfig(token string, insecure bool) *transport.Config {
tlsConfig := &transport.Config{
BearerToken: token,
TLS: transport.TLSConfig{
Insecure: true,
},
}
return tlsConfig
}

func (k *kubeletStub) GetAllPods() (corev1.PodList, error) {
podList := corev1.PodList{}
result, err := util.DoHTTPGet("pods", k.ipAddr, k.httpPort, k.timeoutSeconds)
url := fmt.Sprintf("https://%v:%d/pods/", k.addr, k.port)
rsp, err := k.httpClient.Get(url)
if err != nil {
return podList, err
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
return podList, fmt.Errorf("request %s failed, code %d", url, rsp.StatusCode)
}

body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return podList, err
}

// parse json data
err = json.Unmarshal(result, &podList)
err = json.Unmarshal(body, &podList)
if err != nil {
return podList, fmt.Errorf("parse kubelet pod list failed, err: %v", err)
}
Expand Down
Loading

0 comments on commit e6d2498

Please sign in to comment.