Skip to content

Commit

Permalink
feat: improve load balancer to support dynamically updating backends (#…
Browse files Browse the repository at this point in the history
…2314)

Signed-off-by: rambohe-ch <[email protected]>
  • Loading branch information
rambohe-ch authored Feb 13, 2025
1 parent 6ab412d commit 2a40449
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 372 deletions.
2 changes: 1 addition & 1 deletion cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&o.YurtHubCertOrganizations, "hub-cert-organizations", o.YurtHubCertOrganizations, "Organizations that will be added into hub's apiserver client certificate, the format is: certOrg1,certOrg2,...")
fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).")
fs.StringVar(&o.NodeName, "node-name", o.NodeName, "the name of node that runs hub agent")
fs.StringVar(&o.LBMode, "lb-mode", o.LBMode, "the mode of load balancer to connect remote servers(rr, priority)")
fs.StringVar(&o.LBMode, "lb-mode", o.LBMode, "the mode of load balancer to connect remote servers(round-robin, priority)")
fs.IntVar(&o.HeartbeatFailedRetry, "heartbeat-failed-retry", o.HeartbeatFailedRetry, "number of heartbeat request retry after having failed.")
fs.IntVar(&o.HeartbeatHealthyThreshold, "heartbeat-healthy-threshold", o.HeartbeatHealthyThreshold, "minimum consecutive successes for the heartbeat to be considered healthy after having failed.")
fs.IntVar(&o.HeartbeatTimeoutSeconds, "heartbeat-timeout-seconds", o.HeartbeatTimeoutSeconds, " number of seconds after which the heartbeat times out.")
Expand Down
44 changes: 19 additions & 25 deletions pkg/yurthub/healthchecker/cloudapiserver/fake_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,37 @@ import (
)

type fakeChecker struct {
healthy bool
settings map[string]int
servers map[*url.URL]bool
}

// BackendHealthyStatus returns healthy status of server
func (fc *fakeChecker) BackendIsHealthy(server *url.URL) bool {
s := server.String()
if _, ok := fc.settings[s]; !ok {
return fc.healthy
}

if fc.settings[s] < 0 {
return fc.healthy
}

if fc.settings[s] == 0 {
return !fc.healthy
if server != nil {
for s, healthy := range fc.servers {
if s.Host == server.Host {
return healthy
}
}
}

fc.settings[s] = fc.settings[s] - 1
return fc.healthy
return false
}

func (fc *fakeChecker) IsHealthy() bool {
return fc.healthy
for _, isHealthy := range fc.servers {
if isHealthy {
return true
}
}
return false
}

func (fc *fakeChecker) RenewKubeletLeaseTime() {
}

func (fc *fakeChecker) PickOneHealthyBackend() *url.URL {
for server := range fc.settings {
if fc.healthy {
if u, err := url.Parse(server); err == nil {
return u
}
for u, isHealthy := range fc.servers {
if isHealthy {
return u
}
}

Expand All @@ -70,9 +65,8 @@ func (fc *fakeChecker) UpdateServers(servers []*url.URL) {
}

// NewFakeChecker creates a fake checker
func NewFakeChecker(healthy bool, settings map[string]int) healthchecker.Interface {
func NewFakeChecker(servers map[*url.URL]bool) healthchecker.Interface {
return &fakeChecker{
settings: settings,
healthy: healthy,
servers: servers,
}
}
6 changes: 4 additions & 2 deletions pkg/yurthub/otaupdate/ota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ func TestHealthyCheck(t *testing.T) {
t.Fatalf("failed to make temp dir, %v", err)
}
nodeName := "foo"
servers := map[string]int{"https://10.10.10.113:6443": 2}
servers := map[*url.URL]bool{
{Host: "10.10.10.113:6443"}: false,
}
u, _ := url.Parse("https://10.10.10.113:6443")
remoteServers := []*url.URL{u}
fakeHealthchecker := cloudapiserver.NewFakeChecker(false, servers)
fakeHealthchecker := cloudapiserver.NewFakeChecker(servers)

client, err := testdata.CreateCertFakeClient("../certificate/testdata")
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func TestLocalCacheHandler(t *testing.T) {

sw := cachemanager.NewStorageWrapper(dStorage)
//u, _ := url.Parse("https://10.10.10.113:6443")
fakeHealthChecker := cloudapiserver.NewFakeChecker(false, nil)
servers := map[*url.URL]bool{
{Host: "10.10.10.113:6443"}: false,
}
fakeHealthChecker := cloudapiserver.NewFakeChecker(servers)

u, _ := url.Parse("https://10.10.10.113:6443")
remoteServers := []*url.URL{u}
Expand Down
5 changes: 1 addition & 4 deletions pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,14 @@ func NewYurtReverseProxyHandler(
}
resolver := server.NewRequestInfoResolver(cfg)

lb, err := remote.NewLoadBalancer(
lb := remote.NewLoadBalancer(
yurtHubCfg.LBMode,
yurtHubCfg.RemoteServers,
localCacheMgr,
yurtHubCfg.TransportAndDirectClientManager,
cloudHealthChecker,
yurtHubCfg.FilterFinder,
stopCh)
if err != nil {
return nil, err
}

var localProxy, autonomyProxy http.Handler
if !yurtutil.IsNil(cloudHealthChecker) && !yurtutil.IsNil(localCacheMgr) {
Expand Down
Loading

0 comments on commit 2a40449

Please sign in to comment.