From 2a404493e6531da7f3337a3e79830143aa70df03 Mon Sep 17 00:00:00 2001 From: rambohe Date: Thu, 13 Feb 2025 14:52:59 +1100 Subject: [PATCH] feat: improve load balancer to support dynamically updating backends (#2314) Signed-off-by: rambohe-ch --- cmd/yurthub/app/options/options.go | 2 +- .../cloudapiserver/fake_checker.go | 44 +-- pkg/yurthub/otaupdate/ota_test.go | 6 +- .../nonresourcerequest/nonresource_test.go | 5 +- pkg/yurthub/proxy/proxy.go | 5 +- pkg/yurthub/proxy/remote/loadbalancer.go | 225 ++++++----- pkg/yurthub/proxy/remote/loadbalancer_test.go | 371 ++++++------------ 7 files changed, 286 insertions(+), 372 deletions(-) diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index b6ff0c47c81..f33c31741da 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -184,7 +184,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&o.YurtHubCertOrganizations, "hub-cert-organizations", o.YurtHubCertOrganizations, "Organizations that will be added into hub's apiserver client certificate, the format is: certOrg1,certOrg2,...") fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).") fs.StringVar(&o.NodeName, "node-name", o.NodeName, "the name of node that runs hub agent") - fs.StringVar(&o.LBMode, "lb-mode", o.LBMode, "the mode of load balancer to connect remote servers(rr, priority)") + fs.StringVar(&o.LBMode, "lb-mode", o.LBMode, "the mode of load balancer to connect remote servers(round-robin, priority)") fs.IntVar(&o.HeartbeatFailedRetry, "heartbeat-failed-retry", o.HeartbeatFailedRetry, "number of heartbeat request retry after having failed.") fs.IntVar(&o.HeartbeatHealthyThreshold, "heartbeat-healthy-threshold", o.HeartbeatHealthyThreshold, "minimum consecutive successes for the heartbeat to be considered healthy after having failed.") fs.IntVar(&o.HeartbeatTimeoutSeconds, "heartbeat-timeout-seconds", o.HeartbeatTimeoutSeconds, " number of seconds after which the heartbeat times out.") diff --git a/pkg/yurthub/healthchecker/cloudapiserver/fake_checker.go b/pkg/yurthub/healthchecker/cloudapiserver/fake_checker.go index e3e55a0e01c..88d6851c89e 100644 --- a/pkg/yurthub/healthchecker/cloudapiserver/fake_checker.go +++ b/pkg/yurthub/healthchecker/cloudapiserver/fake_checker.go @@ -23,42 +23,37 @@ import ( ) type fakeChecker struct { - healthy bool - settings map[string]int + servers map[*url.URL]bool } // BackendHealthyStatus returns healthy status of server func (fc *fakeChecker) BackendIsHealthy(server *url.URL) bool { - s := server.String() - if _, ok := fc.settings[s]; !ok { - return fc.healthy - } - - if fc.settings[s] < 0 { - return fc.healthy - } - - if fc.settings[s] == 0 { - return !fc.healthy + if server != nil { + for s, healthy := range fc.servers { + if s.Host == server.Host { + return healthy + } + } } - - fc.settings[s] = fc.settings[s] - 1 - return fc.healthy + return false } func (fc *fakeChecker) IsHealthy() bool { - return fc.healthy + for _, isHealthy := range fc.servers { + if isHealthy { + return true + } + } + return false } func (fc *fakeChecker) RenewKubeletLeaseTime() { } func (fc *fakeChecker) PickOneHealthyBackend() *url.URL { - for server := range fc.settings { - if fc.healthy { - if u, err := url.Parse(server); err == nil { - return u - } + for u, isHealthy := range fc.servers { + if isHealthy { + return u } } @@ -70,9 +65,8 @@ func (fc *fakeChecker) UpdateServers(servers []*url.URL) { } // NewFakeChecker creates a fake checker -func NewFakeChecker(healthy bool, settings map[string]int) healthchecker.Interface { +func NewFakeChecker(servers map[*url.URL]bool) healthchecker.Interface { return &fakeChecker{ - settings: settings, - healthy: healthy, + servers: servers, } } diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 9fe9ecd50bf..7330f342506 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -111,10 +111,12 @@ func TestHealthyCheck(t *testing.T) { t.Fatalf("failed to make temp dir, %v", err) } nodeName := "foo" - servers := map[string]int{"https://10.10.10.113:6443": 2} + servers := map[*url.URL]bool{ + {Host: "10.10.10.113:6443"}: false, + } u, _ := url.Parse("https://10.10.10.113:6443") remoteServers := []*url.URL{u} - fakeHealthchecker := cloudapiserver.NewFakeChecker(false, servers) + fakeHealthchecker := cloudapiserver.NewFakeChecker(servers) client, err := testdata.CreateCertFakeClient("../certificate/testdata") if err != nil { diff --git a/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go b/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go index 7111518eb77..9bc038e69f5 100644 --- a/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go +++ b/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go @@ -64,7 +64,10 @@ func TestLocalCacheHandler(t *testing.T) { sw := cachemanager.NewStorageWrapper(dStorage) //u, _ := url.Parse("https://10.10.10.113:6443") - fakeHealthChecker := cloudapiserver.NewFakeChecker(false, nil) + servers := map[*url.URL]bool{ + {Host: "10.10.10.113:6443"}: false, + } + fakeHealthChecker := cloudapiserver.NewFakeChecker(servers) u, _ := url.Parse("https://10.10.10.113:6443") remoteServers := []*url.URL{u} diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index b0376daf567..c5fb1c3e950 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -70,7 +70,7 @@ func NewYurtReverseProxyHandler( } resolver := server.NewRequestInfoResolver(cfg) - lb, err := remote.NewLoadBalancer( + lb := remote.NewLoadBalancer( yurtHubCfg.LBMode, yurtHubCfg.RemoteServers, localCacheMgr, @@ -78,9 +78,6 @@ func NewYurtReverseProxyHandler( cloudHealthChecker, yurtHubCfg.FilterFinder, stopCh) - if err != nil { - return nil, err - } var localProxy, autonomyProxy http.Handler if !yurtutil.IsNil(cloudHealthChecker) && !yurtutil.IsNil(localCacheMgr) { diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 176845f3f07..cdbcc50120e 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -24,6 +24,7 @@ import ( "net/http" "net/url" "sync" + "sync/atomic" "k8s.io/apimachinery/pkg/runtime/schema" apirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -38,106 +39,141 @@ import ( hubutil "github.com/openyurtio/openyurt/pkg/yurthub/util" ) -type loadBalancerAlgo interface { - PickOne() *util.RemoteProxy +// Backend defines the interface of proxy for a remote backend server. +type Backend interface { Name() string + RemoteServer() *url.URL + ServeHTTP(rw http.ResponseWriter, req *http.Request) } -type rrLoadBalancerAlgo struct { - sync.Mutex +// LoadBalancingStrategy defines the interface for different load balancing strategies. +type LoadBalancingStrategy interface { + Name() string + PickOne() Backend + UpdateBackends(backends []Backend) +} + +// BaseLoadBalancingStrategy provides common logic for load balancing strategies. +type BaseLoadBalancingStrategy struct { + sync.RWMutex checker healthchecker.Interface - backends []*util.RemoteProxy - next int + backends []Backend +} + +// UpdateBackends updates the list of backends in a thread-safe manner. +func (b *BaseLoadBalancingStrategy) UpdateBackends(backends []Backend) { + b.Lock() + defer b.Unlock() + b.backends = backends +} + +// checkAndReturnHealthyBackend checks if a backend is healthy before returning it. +func (b *BaseLoadBalancingStrategy) checkAndReturnHealthyBackend(index int) Backend { + if len(b.backends) == 0 { + return nil + } + + backend := b.backends[index] + if !yurtutil.IsNil(b.checker) && !b.checker.BackendIsHealthy(backend.RemoteServer()) { + return nil + } + return backend +} + +// RoundRobinStrategy implements round-robin load balancing. +type RoundRobinStrategy struct { + BaseLoadBalancingStrategy + next uint64 } -func (rr *rrLoadBalancerAlgo) Name() string { - return "rr algorithm" +// Name returns the name of the strategy. +func (rr *RoundRobinStrategy) Name() string { + return "round-robin" } -func (rr *rrLoadBalancerAlgo) PickOne() *util.RemoteProxy { +// PickOne selects a backend using a round-robin approach. +func (rr *RoundRobinStrategy) PickOne() Backend { + rr.RLock() + defer rr.RUnlock() + if len(rr.backends) == 0 { return nil - } else if len(rr.backends) == 1 { - if !yurtutil.IsNil(rr.checker) { - if !rr.checker.BackendIsHealthy(rr.backends[0].RemoteServer()) { - return nil - } - } - return rr.backends[0] - } else { - // round robin - rr.Lock() - defer rr.Unlock() - hasFound := false - selected := rr.next - for i := 0; i < len(rr.backends); i++ { - selected = (rr.next + i) % len(rr.backends) - if !yurtutil.IsNil(rr.checker) { - if !rr.checker.BackendIsHealthy(rr.backends[selected].RemoteServer()) { - // use checker until get a healthy backend - continue + } + + totalBackends := len(rr.backends) + // Infinite loop to handle CAS failures and ensure fair selection under high concurrency. + for { + // load the current round-robin index. + startIndex := int(atomic.LoadUint64(&rr.next)) + for i := 0; i < totalBackends; i++ { + index := (startIndex + i) % totalBackends + if backend := rr.checkAndReturnHealthyBackend(index); backend != nil { + // attempt to update next atomically using CAS(Compare-And-Swap) + // if another go routine has already updated next, CAS operation will fail. + // if successful, next is updated to index+1 to maintain round-robin fairness. + if atomic.CompareAndSwapUint64(&rr.next, uint64(startIndex), uint64(index+1)) { + return backend } + // CAS operation failed, meaning another go routine modified next, so break to retry the selection process. + break } - hasFound = true - break } - if hasFound { - rr.next = (selected + 1) % len(rr.backends) - return rr.backends[selected] + // if no healthy backend is found, exit the loop and return nil. + if !rr.hasHealthyBackend() { + return nil } } +} - return nil +// hasHealthyBackend checks if there is at least one healthy backend available. +func (rr *RoundRobinStrategy) hasHealthyBackend() bool { + for i := range rr.backends { + if rr.checkAndReturnHealthyBackend(i) != nil { + return true + } + } + return false } -type priorityLoadBalancerAlgo struct { - sync.Mutex - checker healthchecker.Interface - backends []*util.RemoteProxy +// PriorityStrategy implements priority-based load balancing. +type PriorityStrategy struct { + BaseLoadBalancingStrategy } -func (prio *priorityLoadBalancerAlgo) Name() string { - return "priority algorithm" +// Name returns the name of the strategy. +func (prio *PriorityStrategy) Name() string { + return "priority" } -func (prio *priorityLoadBalancerAlgo) PickOne() *util.RemoteProxy { - if len(prio.backends) == 0 { - return nil - } else if len(prio.backends) == 1 { - if !yurtutil.IsNil(prio.checker) { - if !prio.checker.BackendIsHealthy(prio.backends[0].RemoteServer()) { - return nil - } - } - return prio.backends[0] - } else { - prio.Lock() - defer prio.Unlock() - for i := 0; i < len(prio.backends); i++ { - if !yurtutil.IsNil(prio.checker) { - if prio.checker.BackendIsHealthy(prio.backends[i].RemoteServer()) { - return prio.backends[i] - } - } else { - return prio.backends[i] - } +// PickOne selects the first available healthy backend. +func (prio *PriorityStrategy) PickOne() Backend { + prio.RLock() + defer prio.RUnlock() + for i := 0; i < len(prio.backends); i++ { + if backend := prio.checkAndReturnHealthyBackend(i); backend != nil { + return backend } - return nil } + + return nil } // LoadBalancer is an interface for proxying http request to remote server // based on the load balance mode(round-robin or priority) type LoadBalancer interface { ServeHTTP(rw http.ResponseWriter, req *http.Request) + UpdateBackends(remoteServers []*url.URL) + CurrentStrategy() LoadBalancingStrategy } type loadBalancer struct { - backends []*util.RemoteProxy - algo loadBalancerAlgo + strategy LoadBalancingStrategy localCacheMgr cachemanager.CacheManager filterFinder filter.FilterFinder + transportMgr transport.Interface + healthChecker healthchecker.Interface + mode string stopCh <-chan struct{} } @@ -149,55 +185,64 @@ func NewLoadBalancer( transportMgr transport.Interface, healthChecker healthchecker.Interface, filterFinder filter.FilterFinder, - stopCh <-chan struct{}) (LoadBalancer, error) { + stopCh <-chan struct{}) LoadBalancer { lb := &loadBalancer{ + mode: lbMode, localCacheMgr: localCacheMgr, filterFinder: filterFinder, + transportMgr: transportMgr, + healthChecker: healthChecker, stopCh: stopCh, } - backends := make([]*util.RemoteProxy, 0, len(remoteServers)) - for i := range remoteServers { - b, err := util.NewRemoteProxy(remoteServers[i], lb.modifyResponse, lb.errorHandler, transportMgr, stopCh) + + // initialize backends + lb.UpdateBackends(remoteServers) + + return lb +} + +// UpdateBackends dynamically updates the list of remote servers. +func (lb *loadBalancer) UpdateBackends(remoteServers []*url.URL) { + newBackends := make([]Backend, 0, len(remoteServers)) + for _, server := range remoteServers { + proxy, err := util.NewRemoteProxy(server, lb.modifyResponse, lb.errorHandler, lb.transportMgr, lb.stopCh) if err != nil { - klog.Errorf("could not new proxy backend(%s), %v", remoteServers[i].String(), err) + klog.Errorf("could not create proxy for backend %s, %v", server.String(), err) continue } - backends = append(backends, b) - } - if len(backends) == 0 { - return nil, fmt.Errorf("no backends can be used by lb") + newBackends = append(newBackends, proxy) } - var algo loadBalancerAlgo - switch lbMode { - case "rr": - algo = &rrLoadBalancerAlgo{backends: backends, checker: healthChecker} - case "priority": - algo = &priorityLoadBalancerAlgo{backends: backends, checker: healthChecker} - default: - algo = &rrLoadBalancerAlgo{backends: backends, checker: healthChecker} + if lb.strategy == nil { + switch lb.mode { + case "priority": + lb.strategy = &PriorityStrategy{BaseLoadBalancingStrategy{checker: lb.healthChecker}} + default: + lb.strategy = &RoundRobinStrategy{BaseLoadBalancingStrategy{checker: lb.healthChecker}, 0} + } } - lb.backends = backends - lb.algo = algo + lb.strategy.UpdateBackends(newBackends) +} - return lb, nil +func (lb *loadBalancer) CurrentStrategy() LoadBalancingStrategy { + return lb.strategy } +// ServeHTTP forwards the request to a backend. func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - // pick a remote proxy based on the load balancing algorithm. - rp := lb.algo.PickOne() + rp := lb.strategy.PickOne() if rp == nil { - // exceptional case - klog.Errorf("could not pick one healthy backends by %s for request %s", lb.algo.Name(), hubutil.ReqString(req)) - http.Error(rw, "could not pick one healthy backends, try again to go through local proxy.", http.StatusInternalServerError) + klog.Errorf("no healthy backend avialbale for request %s", hubutil.ReqString(req)) + http.Error(rw, "no healthy backends available.", http.StatusBadGateway) return } - klog.V(3).Infof("picked backend %s by %s for request %s", rp.Name(), lb.algo.Name(), hubutil.ReqString(req)) + klog.V(3).Infof("forwarding request %s to backend %s", hubutil.ReqString(req), rp.Name()) rp.ServeHTTP(rw, req) } +// errorHandler handles errors and tries to serve from local cache. func (lb *loadBalancer) errorHandler(rw http.ResponseWriter, req *http.Request, err error) { klog.Errorf("remote proxy error handler: %s, %v", hubutil.ReqString(req), err) if lb.localCacheMgr == nil || !lb.localCacheMgr.CanCacheFor(req) { diff --git a/pkg/yurthub/proxy/remote/loadbalancer_test.go b/pkg/yurthub/proxy/remote/loadbalancer_test.go index b60aba0b470..bf8fc3339b1 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer_test.go +++ b/pkg/yurthub/proxy/remote/loadbalancer_test.go @@ -20,12 +20,13 @@ import ( "context" "net/http" "net/url" + "sort" "testing" "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/cloudapiserver" - "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" "github.com/openyurtio/openyurt/pkg/yurthub/transport" ) @@ -34,279 +35,151 @@ var ( transportMgr transport.Interface = transport.NewFakeTransportManager(http.StatusOK, map[string]kubernetes.Interface{}) ) -type PickBackend struct { - DeltaRequestsCnt int - ReturnServer string +func sortURLs(urls []*url.URL) { + sort.Slice(urls, func(i, j int) bool { + return urls[i].Host < urls[j].Host + }) } -func TestRrLoadBalancerAlgo(t *testing.T) { +func TestLoadBalancingStrategy(t *testing.T) { testcases := map[string]struct { - Servers []string - PickBackends []PickBackend + lbMode string + servers map[*url.URL]bool + results []string }{ - "no backend servers": { - Servers: []string{}, - PickBackends: []PickBackend{ - {DeltaRequestsCnt: 1, ReturnServer: ""}, + "round-robin: no backend server": { + lbMode: "round-robin", + servers: map[*url.URL]bool{}, + results: []string{""}, + }, + "round-robin: one backend server": { + lbMode: "round-robin", + servers: map[*url.URL]bool{ + {Host: "127.0.0.1:8080"}: true, }, + results: []string{"127.0.0.1:8080", "127.0.0.1:8080"}, }, - - "one backend server": { - Servers: []string{"http://127.0.0.1:8080"}, - PickBackends: []PickBackend{ - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, + "round-robin: multiple backend servers": { + lbMode: "round-robin", + servers: map[*url.URL]bool{ + {Host: "127.0.0.1:8080"}: true, + {Host: "127.0.0.1:8081"}: true, + {Host: "127.0.0.1:8082"}: true, + {Host: "127.0.0.1:8083"}: true, + }, + results: []string{ + "127.0.0.1:8080", + "127.0.0.1:8081", + "127.0.0.1:8082", + "127.0.0.1:8083", + "127.0.0.1:8080", }, }, - - "multi backend server": { - Servers: []string{"http://127.0.0.1:8080", "http://127.0.0.1:8081", "http://127.0.0.1:8082"}, - PickBackends: []PickBackend{ - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 2, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 3, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 4, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 4, ReturnServer: "http://127.0.0.1:8081"}, - {DeltaRequestsCnt: 4, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 5, ReturnServer: "http://127.0.0.1:8081"}, - {DeltaRequestsCnt: 5, ReturnServer: "http://127.0.0.1:8080"}, + "round-robin: multiple backend servers with unhealthy server": { + lbMode: "round-robin", + servers: map[*url.URL]bool{ + {Host: "127.0.0.1:8080"}: true, + {Host: "127.0.0.1:8081"}: false, + {Host: "127.0.0.1:8082"}: true, + }, + results: []string{ + "127.0.0.1:8080", + "127.0.0.1:8082", + "127.0.0.1:8080", }, }, - } - - checker := cloudapiserver.NewFakeChecker(true, map[string]int{}) - for k, tc := range testcases { - backends := make([]*util.RemoteProxy, len(tc.Servers)) - for i := range tc.Servers { - var err error - u, _ := url.Parse(tc.Servers[i]) - backends[i], err = util.NewRemoteProxy(u, nil, nil, transportMgr, neverStop) - if err != nil { - t.Errorf("failed to create remote server for %s, %v", u.String(), err) - } - } - - rr := &rrLoadBalancerAlgo{ - backends: backends, - checker: checker, - } - - for i := range tc.PickBackends { - var b *util.RemoteProxy - for j := 0; j < tc.PickBackends[i].DeltaRequestsCnt; j++ { - b = rr.PickOne() - } - - if len(tc.PickBackends[i].ReturnServer) == 0 { - if b != nil { - t.Errorf("%s rr lb pick: expect no backend server, but got %s", k, b.RemoteServer().String()) - } - } else { - if b == nil { - t.Errorf("%s rr lb pick: expect backend server: %s, but got no backend server", k, tc.PickBackends[i].ReturnServer) - } else if b.RemoteServer().String() != tc.PickBackends[i].ReturnServer { - t.Errorf("%s rr lb pick(round %d): expect backend server: %s, but got %s", k, i+1, tc.PickBackends[i].ReturnServer, b.RemoteServer().String()) - } - } - } - } -} - -func TestRrLoadBalancerAlgoWithReverseHealthy(t *testing.T) { - testcases := map[string]struct { - Servers []string - PickBackends []PickBackend - }{ - "multi backend server": { - Servers: []string{"http://127.0.0.1:8080", "http://127.0.0.1:8081", "http://127.0.0.1:8082"}, - PickBackends: []PickBackend{ - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8081"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8081"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, + "round-robin: all of backend servers are unhealthy": { + lbMode: "round-robin", + servers: map[*url.URL]bool{ + {Host: "127.0.0.1:8080"}: false, + {Host: "127.0.0.1:8081"}: false, + {Host: "127.0.0.1:8082"}: false, + }, + results: []string{ + "", + "", + "", + "", }, }, - } - - checker := cloudapiserver.NewFakeChecker(true, map[string]int{ - "http://127.0.0.1:8080": 1, - "http://127.0.0.1:8081": 2, - }) - for k, tc := range testcases { - backends := make([]*util.RemoteProxy, len(tc.Servers)) - for i := range tc.Servers { - var err error - u, _ := url.Parse(tc.Servers[i]) - backends[i], err = util.NewRemoteProxy(u, nil, nil, transportMgr, neverStop) - if err != nil { - t.Errorf("failed to create remote server for %s, %v", u.String(), err) - } - } - - rr := &rrLoadBalancerAlgo{ - backends: backends, - checker: checker, - } - - for i := range tc.PickBackends { - var b *util.RemoteProxy - for j := 0; j < tc.PickBackends[i].DeltaRequestsCnt; j++ { - b = rr.PickOne() - } - - if len(tc.PickBackends[i].ReturnServer) == 0 { - if b != nil { - t.Errorf("%s rr lb pick: expect no backend server, but got %s", k, b.RemoteServer().String()) - } - } else { - if b == nil { - t.Errorf("%s rr lb pick(round %d): expect backend server: %s, but got no backend server", k, i+1, tc.PickBackends[i].ReturnServer) - } else if b.RemoteServer().String() != tc.PickBackends[i].ReturnServer { - t.Errorf("%s rr lb pick(round %d): expect backend server: %s, but got %s", k, i+1, tc.PickBackends[i].ReturnServer, b.RemoteServer().String()) - } - } - } - } -} - -func TestPriorityLoadBalancerAlgo(t *testing.T) { - testcases := map[string]struct { - Servers []string - PickBackends []PickBackend - }{ - "no backend servers": { - Servers: []string{}, - PickBackends: []PickBackend{ - {DeltaRequestsCnt: 1, ReturnServer: ""}, + "priority: no backend server": { + lbMode: "priority", + servers: map[*url.URL]bool{}, + results: []string{""}, + }, + "priority: one backend server": { + lbMode: "priority", + servers: map[*url.URL]bool{ + {Host: "127.0.0.1:8080"}: true, }, + results: []string{"127.0.0.1:8080", "127.0.0.1:8080"}, }, - - "one backend server": { - Servers: []string{"http://127.0.0.1:8080"}, - PickBackends: []PickBackend{ - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, + "priority: multiple backend servers": { + lbMode: "priority", + servers: map[*url.URL]bool{ + {Host: "127.0.0.1:8080"}: true, + {Host: "127.0.0.1:8081"}: true, + {Host: "127.0.0.1:8082"}: true, + }, + results: []string{ + "127.0.0.1:8080", + "127.0.0.1:8080", + "127.0.0.1:8080", + "127.0.0.1:8080", }, }, - - "multi backend server": { - Servers: []string{"http://127.0.0.1:8080", "http://127.0.0.1:8081", "http://127.0.0.1:8082"}, - PickBackends: []PickBackend{ - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 2, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 3, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 4, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 4, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 4, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 5, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 5, ReturnServer: "http://127.0.0.1:8080"}, + "priority: multiple backend servers with unhealthy server": { + lbMode: "priority", + servers: map[*url.URL]bool{ + {Host: "127.0.0.1:8080"}: false, + {Host: "127.0.0.1:8081"}: false, + {Host: "127.0.0.1:8082"}: true, + }, + results: []string{ + "127.0.0.1:8082", + "127.0.0.1:8082", + "127.0.0.1:8082", }, }, - } - - checker := cloudapiserver.NewFakeChecker(true, map[string]int{}) - for k, tc := range testcases { - backends := make([]*util.RemoteProxy, len(tc.Servers)) - for i := range tc.Servers { - var err error - u, _ := url.Parse(tc.Servers[i]) - backends[i], err = util.NewRemoteProxy(u, nil, nil, transportMgr, neverStop) - if err != nil { - t.Errorf("failed to create remote server for %s, %v", u.String(), err) - } - } - - rr := &priorityLoadBalancerAlgo{ - backends: backends, - checker: checker, - } - - for i := range tc.PickBackends { - var b *util.RemoteProxy - for j := 0; j < tc.PickBackends[i].DeltaRequestsCnt; j++ { - b = rr.PickOne() - } - - if len(tc.PickBackends[i].ReturnServer) == 0 { - if b != nil { - t.Errorf("%s priority lb pick: expect no backend server, but got %s", k, b.RemoteServer().String()) - } - } else { - if b == nil { - t.Errorf("%s priority lb pick: expect backend server: %s, but got no backend server", k, tc.PickBackends[i].ReturnServer) - } else if b.RemoteServer().String() != tc.PickBackends[i].ReturnServer { - t.Errorf("%s priority lb pick(round %d): expect backend server: %s, but got %s", k, i+1, tc.PickBackends[i].ReturnServer, b.RemoteServer().String()) - } - } - } - } -} - -func TestPriorityLoadBalancerAlgoWithReverseHealthy(t *testing.T) { - testcases := map[string]struct { - Servers []string - PickBackends []PickBackend - }{ - "multi backend server": { - Servers: []string{"http://127.0.0.1:8080", "http://127.0.0.1:8081", "http://127.0.0.1:8082"}, - PickBackends: []PickBackend{ - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8080"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8081"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8081"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8081"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 1, ReturnServer: "http://127.0.0.1:8082"}, - {DeltaRequestsCnt: 2, ReturnServer: "http://127.0.0.1:8082"}, + "priority: all of backend servers are unhealthy": { + lbMode: "priority", + servers: map[*url.URL]bool{ + {Host: "127.0.0.1:8080"}: false, + {Host: "127.0.0.1:8081"}: false, + {Host: "127.0.0.1:8082"}: false, + }, + results: []string{ + "", + "", + "", + "", }, }, } - checker := cloudapiserver.NewFakeChecker(true, map[string]int{ - "http://127.0.0.1:8080": 2, - "http://127.0.0.1:8081": 3}) for k, tc := range testcases { - backends := make([]*util.RemoteProxy, len(tc.Servers)) - for i := range tc.Servers { - var err error - u, _ := url.Parse(tc.Servers[i]) - backends[i], err = util.NewRemoteProxy(u, nil, nil, transportMgr, neverStop) - if err != nil { - t.Errorf("failed to create remote server for %s, %v", u.String(), err) - } - } - - rr := &priorityLoadBalancerAlgo{ - backends: backends, - checker: checker, - } - - for i := range tc.PickBackends { - var b *util.RemoteProxy - for j := 0; j < tc.PickBackends[i].DeltaRequestsCnt; j++ { - b = rr.PickOne() + t.Run(k, func(t *testing.T) { + checker := cloudapiserver.NewFakeChecker(tc.servers) + servers := make([]*url.URL, 0, len(tc.servers)) + for server := range tc.servers { + servers = append(servers, server) } - - if len(tc.PickBackends[i].ReturnServer) == 0 { - if b != nil { - t.Errorf("%s priority lb pick: expect no backend server, but got %s", k, b.RemoteServer().String()) - } - } else { - if b == nil { - t.Errorf("%s priority lb pick: expect backend server: %s, but got no backend server", k, tc.PickBackends[i].ReturnServer) - } else if b.RemoteServer().String() != tc.PickBackends[i].ReturnServer { - t.Errorf("%s priority lb pick(round %d): expect backend server: %s, but got %s", k, i+1, tc.PickBackends[i].ReturnServer, b.RemoteServer().String()) + sortURLs(servers) + klog.Infof("servers: %+v", servers) + + lb := NewLoadBalancer(tc.lbMode, servers, nil, transportMgr, checker, nil, neverStop) + + for _, host := range tc.results { + strategy := lb.CurrentStrategy() + backend := strategy.PickOne() + if backend == nil { + if host != "" { + t.Errorf("expect %s, but got nil", host) + } + } else if backend.RemoteServer().Host != host { + t.Errorf("expect host %s, but got %s", host, backend.RemoteServer().Host) } } - } + }) } }