From c86a55a588f522deee521f60dcff2a88d56e254f Mon Sep 17 00:00:00 2001 From: rambohe-ch Date: Sun, 9 Feb 2025 00:38:27 +1100 Subject: [PATCH] feat: support forward requests to leader hub for sharing pool scope metadata in nodepool. Signed-off-by: rambohe-ch --- charts/yurthub/templates/yurthub-cfg.yaml | 1 + .../yurthub-cloud-yurtstaticset.yaml | 1 + .../templates/yurthub-yurtstaticset.yaml | 1 + charts/yurthub/values.yaml | 1 + cmd/yurthub/app/config/config.go | 54 ++- cmd/yurthub/app/config/config_test.go | 1 + cmd/yurthub/app/options/options.go | 20 + cmd/yurthub/app/options/options_test.go | 8 +- cmd/yurthub/app/start.go | 33 ++ pkg/node-servant/components/yurthub.go | 6 +- pkg/node-servant/constant.go | 2 +- pkg/node-servant/convert/convert.go | 4 +- pkg/node-servant/convert/options.go | 6 + pkg/node-servant/revert/revert.go | 2 +- pkg/yurthub/certificate/manager/manager.go | 1 + pkg/yurthub/configuration/manager.go | 39 +- .../cloudapiserver/health_checker.go | 2 +- .../{cloudapiserver => fake}/fake_checker.go | 34 +- pkg/yurthub/healthchecker/interfaces.go | 2 +- .../healthchecker/leaderhub/leader_hub.go | 2 +- .../leaderhub/leader_hub_test.go | 4 +- pkg/yurthub/multiplexer/multiplexer.go | 299 ++++++++++++--- pkg/yurthub/multiplexer/multiplexer_test.go | 355 +++++++++++++++++- pkg/yurthub/otaupdate/ota_test.go | 4 +- .../multiplexer/multiplexerproxy_test.go | 34 +- .../nonresourcerequest/nonresource_test.go | 4 +- pkg/yurthub/proxy/proxy.go | 124 +++--- pkg/yurthub/proxy/remote/loadbalancer.go | 56 ++- pkg/yurthub/proxy/remote/loadbalancer_test.go | 4 +- pkg/yurthub/proxy/{util => remote}/remote.go | 2 +- pkg/yurthub/proxy/util/util.go | 28 +- pkg/yurthub/proxy/util/util_test.go | 21 +- pkg/yurthub/server/server.go | 6 + pkg/yurthub/util/util.go | 1 + test/e2e/cmd/init/converter.go | 51 ++- test/e2e/cmd/init/init.go | 50 +++ test/e2e/e2e_test.go | 4 +- test/e2e/util/nodepool.go | 2 +- test/e2e/yurt/hubleader.go | 12 +- test/e2e/yurt/nodepool.go | 16 +- 40 files changed, 1051 insertions(+), 246 deletions(-) rename pkg/yurthub/healthchecker/{cloudapiserver => fake}/fake_checker.go (63%) rename pkg/yurthub/proxy/{util => remote}/remote.go (99%) diff --git a/charts/yurthub/templates/yurthub-cfg.yaml b/charts/yurthub/templates/yurthub-cfg.yaml index 478bebabb54..d95667fd25e 100644 --- a/charts/yurthub/templates/yurthub-cfg.yaml +++ b/charts/yurthub/templates/yurthub-cfg.yaml @@ -102,6 +102,7 @@ metadata: name: yurt-hub-cfg namespace: {{ .Release.Namespace }} labels: + openyurt.io/configmap-name: yurt-hub-cfg {{- include "yurthub.labels" . | nindent 4 }} data: cache_agents: {{ .Values.cacheAgents | quote }} diff --git a/charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml b/charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml index 02f744869d8..cacc692b269 100644 --- a/charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml +++ b/charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml @@ -36,6 +36,7 @@ spec: - --bind-address={{ .Values.yurthubBindingAddr }} - --server-addr={{ .Values.kubernetesServerAddr }} - --node-name=$(NODE_NAME) + - --nodepool-name={{ .Values.nodePoolName }} - --bootstrap-file={{ .Values.bootstrapFile }} - --working-mode=cloud - --namespace={{ .Release.Namespace }} diff --git a/charts/yurthub/templates/yurthub-yurtstaticset.yaml b/charts/yurthub/templates/yurthub-yurtstaticset.yaml index fa10c2865a0..994259c5127 100644 --- a/charts/yurthub/templates/yurthub-yurtstaticset.yaml +++ b/charts/yurthub/templates/yurthub-yurtstaticset.yaml @@ -36,6 +36,7 @@ spec: - --bind-address={{ .Values.yurthubBindingAddr }} - --server-addr={{ .Values.kubernetesServerAddr }} - --node-name=$(NODE_NAME) + - --nodepool-name={{ .Values.nodePoolName }} - --bootstrap-file={{ .Values.bootstrapFile }} - --working-mode=edge - --namespace={{ .Release.Namespace }} diff --git a/charts/yurthub/values.yaml b/charts/yurthub/values.yaml index bd72e56c0dd..dbe5911e9a4 100644 --- a/charts/yurthub/values.yaml +++ b/charts/yurthub/values.yaml @@ -10,6 +10,7 @@ yurthubBindingAddr: 127.0.0.1 kubernetesServerAddr: https://127.0.0.1:6443 bootstrapFile: "/var/lib/yurthub/bootstrap-hub.conf" organizations: "" +nodePoolName: "" image: registry: openyurt diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index c97e2f61a27..f958ff8eb5c 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" apiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" @@ -37,7 +38,6 @@ import ( "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" @@ -55,9 +55,8 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" - "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" - "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" "github.com/openyurtio/openyurt/pkg/yurthub/network" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" "github.com/openyurtio/openyurt/pkg/yurthub/tenant" "github.com/openyurtio/openyurt/pkg/yurthub/transport" "github.com/openyurtio/openyurt/pkg/yurthub/util" @@ -90,11 +89,15 @@ type YurtHubConfiguration struct { YurtHubProxyServerServing *apiserver.DeprecatedInsecureServingInfo YurtHubDummyProxyServerServing *apiserver.DeprecatedInsecureServingInfo YurtHubSecureProxyServerServing *apiserver.SecureServingInfo + YurtHubMultiplexerServerServing *apiserver.SecureServingInfo DiskCachePath string - RequestMultiplexerManager *multiplexer.MultiplexerManager ConfigManager *configuration.Manager TenantManager tenant.Interface TransportAndDirectClientManager transport.Interface + LoadBalancerForLeaderHub remote.LoadBalancer + PoolScopeResources []schema.GroupVersionResource + PortForMultiplexer int + NodePoolName string } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -116,7 +119,9 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub // list/watch endpoints from host cluster in order to resolve tenant cluster address. newEndpointsInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - return coreinformers.NewFilteredEndpointsInformer(client, "kube-public", resyncPeriod, nil, nil) + informer := coreinformers.NewFilteredEndpointsInformer(client, "kube-public", resyncPeriod, nil, nil) + informer.SetTransform(pkgutil.TransformStripManagedFields()) + return informer } sharedFactory.InformerFor(&corev1.Endpoints{}, newEndpointsInformer) cfg.SharedFactory = sharedFactory @@ -128,6 +133,9 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub cfg.RemoteServers = us cfg.LBMode = options.LBMode tenantNamespce := util.ParseTenantNsFromOrgs(options.YurtHubCertOrganizations) + cfg.PoolScopeResources = options.PoolScopeResources + cfg.PortForMultiplexer = options.PortForMultiplexer + cfg.NodePoolName = options.NodePoolName // prepare some basic configurations as following: // - serializer manager: used for managing serializer for encoding or decoding response from kube-apiserver. @@ -145,7 +153,7 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub if err != nil { return nil, err } - registerInformers(sharedFactory, options.YurtHubNamespace, options.NodeName, (cfg.WorkingMode == util.WorkingModeCloud), tenantNamespce) + registerInformers(sharedFactory, options.YurtHubNamespace, options.NodePoolName, options.NodeName, (cfg.WorkingMode == util.WorkingModeCloud), tenantNamespce) certMgr, err := certificatemgr.NewYurtHubCertManager(options, us) if err != nil { @@ -191,7 +199,6 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub cfg.ConfigManager = configManager cfg.FilterFinder = filterFinder - cfg.RequestMultiplexerManager = newRequestMultiplexerManager(options, restMapperManager) if options.EnableDummyIf { klog.V(2).Infof("create dummy network interface %s(%s)", options.HubAgentDummyIfName, options.HubAgentDummyIfIP) @@ -284,15 +291,17 @@ func createClientAndSharedInformerFactories(serverAddr, nodePoolName string) (ku func registerInformers( informerFactory informers.SharedInformerFactory, namespace string, + poolName string, nodeName string, enablePodInformer bool, tenantNs string) { - // configmap informer is used for list/watching yurt-hub-cfg configmap which includes configurations about cache agents and filters. - // and is used by approver in filter and cache manager on cloud and edge working mode. + // configmap informer is used for list/watching yurt-hub-cfg configmap and leader-hub-{poolName} configmap. + // yurt-hub-cfg configmap includes configurations about cache agents and filters which are needed by approver in filter and cache manager on cloud and edge working mode. + // leader-hub-{nodePoolName} configmap includes leader election configurations which are used by multiplexer manager. newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { tweakListOptions := func(options *metav1.ListOptions) { - options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String() + options.LabelSelector = fmt.Sprintf("openyurt.io/configmap-name in (%s, %s)", util.YurthubConfigMapName, "leader-hub-"+poolName) } informer := coreinformers.NewFilteredConfigMapInformer(client, namespace, resyncPeriod, nil, tweakListOptions) informer.SetTransform(pkgutil.TransformStripManagedFields()) @@ -385,17 +394,22 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y cfg.YurtHubSecureProxyServerServing.ClientCA = caBundleProvider cfg.YurtHubSecureProxyServerServing.DisableHTTP2 = true - return nil -} - -func newRequestMultiplexerManager(options *options.YurtHubOptions, restMapperManager *meta.RESTMapperManager) *multiplexer.MultiplexerManager { - config := &rest.Config{ - Host: fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort), - UserAgent: util.MultiplexerProxyClientUserAgentPrefix + options.NodeName, + if err := (&apiserveroptions.SecureServingOptions{ + BindAddress: net.ParseIP(options.NodeIP), + BindPort: options.PortForMultiplexer, + BindNetwork: "tcp", + ServerCert: apiserveroptions.GeneratableKeyCert{ + CertKey: apiserveroptions.CertKey{ + CertFile: serverCertPath, + KeyFile: serverCertPath, + }, + }, + }).ApplyTo(&cfg.YurtHubMultiplexerServerServing); err != nil { + return err } - storageProvider := storage.NewStorageProvider(config) - - return multiplexer.NewRequestMultiplexerManager(storageProvider, restMapperManager, options.PoolScopeResources) + cfg.YurtHubMultiplexerServerServing.ClientCA = caBundleProvider + cfg.YurtHubMultiplexerServerServing.DisableHTTP2 = true + return nil } func ReadinessCheck(cfg *YurtHubConfiguration) error { diff --git a/cmd/yurthub/app/config/config_test.go b/cmd/yurthub/app/config/config_test.go index 2a7e7b60867..52f4a1362ea 100644 --- a/cmd/yurthub/app/config/config_test.go +++ b/cmd/yurthub/app/config/config_test.go @@ -38,6 +38,7 @@ func TestComplete(t *testing.T) { options.NodeName = "foo" options.EnableDummyIf = false options.HubAgentDummyIfIP = "169.254.2.1" + options.NodeIP = "127.0.0.1" cfg, err := Complete(options, nil) if err != nil { t.Errorf("expect no err, but got %v", err) diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index f33c31741da..942030b4c70 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "errors" "fmt" "net" "path/filepath" @@ -24,6 +25,7 @@ import ( "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime/schema" + apinet "k8s.io/apimachinery/pkg/util/net" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -83,6 +85,8 @@ type YurtHubOptions struct { ClientForTest kubernetes.Interface EnablePoolServiceTopology bool PoolScopeResources PoolScopeMetadatas + PortForMultiplexer int + NodeIP string } // NewYurtHubOptions creates a new YurtHubOptions with a default config. @@ -93,6 +97,7 @@ func NewYurtHubOptions() *YurtHubOptions { YurtHubProxyPort: util.YurtHubProxyPort, YurtHubPort: util.YurtHubPort, YurtHubProxySecurePort: util.YurtHubProxySecurePort, + PortForMultiplexer: util.YurtHubMultiplexerPort, YurtHubNamespace: util.YurtHubNamespace, GCFrequency: 120, YurtHubCertOrganizations: make([]string, 0), @@ -167,6 +172,19 @@ func (options *YurtHubOptions) Validate() error { if len(options.CACertHashes) == 0 && !options.UnsafeSkipCAVerification { return fmt.Errorf("set --discovery-token-unsafe-skip-ca-verification flag as true or pass CACertHashes to continue") } + + if len(options.NodePoolName) == 0 { + return errors.New("node-pool-name is empty") + } + + if len(options.NodeIP) == 0 { + ipAddr, err := apinet.ResolveBindAddress(nil) + if err != nil { + return fmt.Errorf("couldn't get the node ip, %v", err) + } + options.NodeIP = ipAddr.String() + klog.Infof("node ip is configured as %s", options.NodeIP) + } } return nil @@ -179,11 +197,13 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.YurtHubProxyHost, "bind-proxy-address", o.YurtHubProxyHost, "the IP address of YurtHub Proxy Server") fs.IntVar(&o.YurtHubProxyPort, "proxy-port", o.YurtHubProxyPort, "the port on which to proxy HTTP requests to kube-apiserver") fs.IntVar(&o.YurtHubProxySecurePort, "proxy-secure-port", o.YurtHubProxySecurePort, "the port on which to proxy HTTPS requests to kube-apiserver") + fs.IntVar(&o.PortForMultiplexer, "multiplexer-port", o.PortForMultiplexer, "the port on which to proxy HTTPS requests to multiplexer in yurthub") fs.StringVar(&o.YurtHubNamespace, "namespace", o.YurtHubNamespace, "the namespace of YurtHub Server") fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver, the format is: \"server1,server2,...\"; when yurthub is in local mode, server-addr represents the service address of apiservers, the format is: \"ip:port\".") fs.StringSliceVar(&o.YurtHubCertOrganizations, "hub-cert-organizations", o.YurtHubCertOrganizations, "Organizations that will be added into hub's apiserver client certificate, the format is: certOrg1,certOrg2,...") fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).") fs.StringVar(&o.NodeName, "node-name", o.NodeName, "the name of node that runs hub agent") + fs.StringVar(&o.NodeIP, "node-ip", o.NodeIP, "the same IP address of the node which used by kubelet. if unset, node's default IPv4 address will be used.") fs.StringVar(&o.LBMode, "lb-mode", o.LBMode, "the mode of load balancer to connect remote servers(round-robin, priority)") fs.IntVar(&o.HeartbeatFailedRetry, "heartbeat-failed-retry", o.HeartbeatFailedRetry, "number of heartbeat request retry after having failed.") fs.IntVar(&o.HeartbeatHealthyThreshold, "heartbeat-healthy-threshold", o.HeartbeatHealthyThreshold, "minimum consecutive successes for the heartbeat to be considered healthy after having failed.") diff --git a/cmd/yurthub/app/options/options_test.go b/cmd/yurthub/app/options/options_test.go index 009187851e3..8dcac5b91c6 100644 --- a/cmd/yurthub/app/options/options_test.go +++ b/cmd/yurthub/app/options/options_test.go @@ -38,6 +38,7 @@ func TestNewYurtHubOptions(t *testing.T) { YurtHubProxyPort: util.YurtHubProxyPort, YurtHubPort: util.YurtHubPort, YurtHubProxySecurePort: util.YurtHubProxySecurePort, + PortForMultiplexer: util.YurtHubMultiplexerPort, YurtHubNamespace: util.YurtHubNamespace, GCFrequency: 120, YurtHubCertOrganizations: make([]string, 0), @@ -179,10 +180,11 @@ func TestValidate(t *testing.T) { LBMode: "rr", WorkingMode: "cloud", UnsafeSkipCAVerification: true, + NodePoolName: "foo", }, isErr: false, }, - "normal options with ipv4": { + "normal options with ipv6": { options: &YurtHubOptions{ NodeName: "foo", ServerAddr: "1.2.3.4:56", @@ -191,10 +193,11 @@ func TestValidate(t *testing.T) { WorkingMode: "cloud", UnsafeSkipCAVerification: true, HubAgentDummyIfIP: "fd00::2:1", + NodePoolName: "foo", }, isErr: false, }, - "normal options with ipv6": { + "normal options with ipv4": { options: &YurtHubOptions{ NodeName: "foo", ServerAddr: "1.2.3.4:56", @@ -203,6 +206,7 @@ func TestValidate(t *testing.T) { WorkingMode: "cloud", UnsafeSkipCAVerification: true, HubAgentDummyIfIP: "169.254.2.1", + NodePoolName: "foo", }, isErr: false, }, diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index cd1be6b6edf..23f8065fea3 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -19,9 +19,12 @@ package app import ( "context" "fmt" + "net/url" + "time" "github.com/spf13/cobra" "github.com/spf13/pflag" + "k8s.io/client-go/rest" "k8s.io/component-base/cli/globalflag" "k8s.io/klog/v2" @@ -32,10 +35,15 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/gc" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/cloudapiserver" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/leaderhub" "github.com/openyurtio/openyurt/pkg/yurthub/locallb" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" "github.com/openyurtio/openyurt/pkg/yurthub/proxy" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" "github.com/openyurtio/openyurt/pkg/yurthub/server" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" + "github.com/openyurtio/openyurt/pkg/yurthub/transport" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -137,6 +145,18 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { trace++ } + // no leader hub servers for transport manager at startup time. + // and don't filter response of request for pool scope metadata from leader hub. + transportManagerForLeaderHub, err := transport.NewTransportAndClientManager([]*url.URL{}, 2, cfg.CertManager, ctx.Done()) + if err != nil { + return fmt.Errorf("could not new transport manager for leader hub, %w", err) + } + healthCheckerForLeaderHub := leaderhub.NewLeaderHubHealthChecker(20*time.Second, nil, ctx.Done()) + loadBalancerForLeaderHub := remote.NewLoadBalancer("round-robin", []*url.URL{}, cacheManager, transportManagerForLeaderHub, healthCheckerForLeaderHub, nil, ctx.Done()) + + cfg.LoadBalancerForLeaderHub = loadBalancerForLeaderHub + requestMultiplexerManager := newRequestMultiplexerManager(cfg, healthCheckerForLeaderHub) + if cfg.NetworkMgr != nil { klog.Infof("%d. start network manager for ensuing dummy interface", trace) cfg.NetworkMgr.Run(ctx.Done()) @@ -153,6 +173,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { cfg, cacheManager, cloudHealthChecker, + requestMultiplexerManager, ctx.Done()) if err != nil { return fmt.Errorf("could not create reverse proxy handler, %w", err) @@ -170,3 +191,15 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { klog.Info("hub agent exited") return nil } + +func newRequestMultiplexerManager(cfg *config.YurtHubConfiguration, healthCheckerForLeaderHub healthchecker.Interface) *multiplexer.MultiplexerManager { + insecureHubProxyAddress := cfg.YurtHubProxyServerServing.Listener.Addr().String() + klog.Infof("hub insecure proxy address: %s", insecureHubProxyAddress) + config := &rest.Config{ + Host: fmt.Sprintf("http://%s", insecureHubProxyAddress), + UserAgent: util.MultiplexerProxyClientUserAgentPrefix + cfg.NodeName, + } + storageProvider := storage.NewStorageProvider(config) + + return multiplexer.NewRequestMultiplexerManager(cfg, storageProvider, healthCheckerForLeaderHub) +} diff --git a/pkg/node-servant/components/yurthub.go b/pkg/node-servant/components/yurthub.go index e9a6c1d24c8..76901eb6cdf 100644 --- a/pkg/node-servant/components/yurthub.go +++ b/pkg/node-servant/components/yurthub.go @@ -51,14 +51,16 @@ const ( type yurtHubOperator struct { apiServerAddr string joinToken string + nodePoolName string yurthubHealthCheckTimeout time.Duration } // NewYurthubOperator new yurtHubOperator struct -func NewYurthubOperator(apiServerAddr string, joinToken string, yurthubHealthCheckTimeout time.Duration) *yurtHubOperator { +func NewYurthubOperator(apiServerAddr string, joinToken, nodePoolName string, yurthubHealthCheckTimeout time.Duration) *yurtHubOperator { return &yurtHubOperator{ apiServerAddr: apiServerAddr, joinToken: joinToken, + nodePoolName: nodePoolName, yurthubHealthCheckTimeout: yurthubHealthCheckTimeout, } } @@ -91,6 +93,8 @@ func (op *yurtHubOperator) Install() error { } klog.Infof("yurt-hub.yaml apiServerAddr: %+v", op.apiServerAddr) yssYurtHub := strings.ReplaceAll(string(content), "KUBERNETES_SERVER_ADDRESS", op.apiServerAddr) + klog.Infof("yurt-hub.yaml nodePoolName: %s", op.nodePoolName) + yssYurtHub = strings.ReplaceAll(string(yssYurtHub), "NODE_POOL_NAME", op.nodePoolName) if err = os.WriteFile(getYurthubYaml(podManifestPath), []byte(yssYurtHub), fileMode); err != nil { return err } diff --git a/pkg/node-servant/constant.go b/pkg/node-servant/constant.go index 67644c71f1b..f36492476d7 100644 --- a/pkg/node-servant/constant.go +++ b/pkg/node-servant/constant.go @@ -54,7 +54,7 @@ spec: - /bin/sh - -c args: - - "/usr/local/bin/entry.sh convert {{if .yurthub_healthcheck_timeout}}--yurthub-healthcheck-timeout={{.yurthub_healthcheck_timeout}} {{end}}--join-token={{.joinToken}}" + - "/usr/local/bin/entry.sh convert {{if .yurthub_healthcheck_timeout}}--yurthub-healthcheck-timeout={{.yurthub_healthcheck_timeout}} {{end}}--join-token={{.joinToken}} --nodepool-name={{.nodePoolName}}" securityContext: privileged: true volumeMounts: diff --git a/pkg/node-servant/convert/convert.go b/pkg/node-servant/convert/convert.go index 1894e99ef15..7696af3acdb 100644 --- a/pkg/node-servant/convert/convert.go +++ b/pkg/node-servant/convert/convert.go @@ -30,6 +30,7 @@ type Config struct { joinToken string kubeadmConfPaths []string openyurtDir string + nodePoolName string } // nodeConverter do the convert job @@ -45,6 +46,7 @@ func NewConverterWithOptions(o *Options) *nodeConverter { joinToken: o.joinToken, kubeadmConfPaths: strings.Split(o.kubeadmConfPaths, ","), openyurtDir: o.openyurtDir, + nodePoolName: o.nodePoolName, }, } } @@ -70,7 +72,7 @@ func (n *nodeConverter) installYurtHub() error { if apiServerAddress == "" { return fmt.Errorf("get apiServerAddress empty") } - op := components.NewYurthubOperator(apiServerAddress, n.joinToken, n.yurthubHealthCheckTimeout) + op := components.NewYurthubOperator(apiServerAddress, n.joinToken, n.nodePoolName, n.yurthubHealthCheckTimeout) return op.Install() } diff --git a/pkg/node-servant/convert/options.go b/pkg/node-servant/convert/options.go index 880d6a5dc66..e2536b4284e 100644 --- a/pkg/node-servant/convert/options.go +++ b/pkg/node-servant/convert/options.go @@ -39,6 +39,7 @@ type Options struct { kubeadmConfPaths string openyurtDir string Version bool + nodePoolName string } // NewConvertOptions creates a new Options @@ -56,6 +57,10 @@ func (o *Options) Validate() error { return fmt.Errorf("join token(bootstrap token) is empty") } + if len(o.nodePoolName) == 0 { + return fmt.Errorf("nodepool name is empty") + } + return nil } @@ -64,5 +69,6 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.yurthubHealthCheckTimeout, "yurthub-healthcheck-timeout", o.yurthubHealthCheckTimeout, "The timeout for yurthub health check.") fs.StringVarP(&o.kubeadmConfPaths, "kubeadm-conf-path", "k", o.kubeadmConfPaths, "The path to kubelet service conf that is used by kubelet component to join the cluster on the work node. Support multiple values, will search in order until get the file.(e.g -k kbcfg1,kbcfg2)") fs.StringVar(&o.joinToken, "join-token", o.joinToken, "The token used by yurthub for joining the cluster.") + fs.StringVar(&o.nodePoolName, "nodepool-name", o.nodePoolName, "The nodepool name which the node will be added") fs.BoolVar(&o.Version, "version", o.Version, "print the version information.") } diff --git a/pkg/node-servant/revert/revert.go b/pkg/node-servant/revert/revert.go index 8567f539d5f..264ba0d4f67 100644 --- a/pkg/node-servant/revert/revert.go +++ b/pkg/node-servant/revert/revert.go @@ -60,7 +60,7 @@ func (n *nodeReverter) revertKubelet() error { } func (n *nodeReverter) unInstallYurtHub() error { - op := components.NewYurthubOperator("", "", time.Duration(1)) // params is not important here + op := components.NewYurthubOperator("", "", "", time.Duration(1)) // params is not important here return op.UnInstall() } diff --git a/pkg/yurthub/certificate/manager/manager.go b/pkg/yurthub/certificate/manager/manager.go index a17f7f34aaf..690b4466596 100644 --- a/pkg/yurthub/certificate/manager/manager.go +++ b/pkg/yurthub/certificate/manager/manager.go @@ -92,6 +92,7 @@ func NewYurtHubCertManager(options *options.YurtHubOptions, remoteServers []*url net.ParseIP(options.HubAgentDummyIfIP), net.ParseIP(options.YurtHubHost), net.ParseIP(options.YurtHubProxyHost), + net.ParseIP(options.NodeIP), }) serverCertManager, err := hubServerCert.NewHubServerCertificateManager(options.ClientForTest, clientCertManager, options.NodeName, filepath.Join(workDir, "pki"), certIPs) if err != nil { diff --git a/pkg/yurthub/configuration/manager.go b/pkg/yurthub/configuration/manager.go index fac9a4d787b..8077ec505ef 100644 --- a/pkg/yurthub/configuration/manager.go +++ b/pkg/yurthub/configuration/manager.go @@ -86,11 +86,20 @@ func NewConfigurationManager(nodeName string, sharedFactory informers.SharedInfo // init filter settings m.updateFilterSettings(map[string]string{}, "init") - // prepare configmap event handler - configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: m.addConfigmap, - UpdateFunc: m.updateConfigmap, - DeleteFunc: m.deleteConfigmap, + // prepare yurt-hub-cfg configmap event handler + configmapInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + cfg, ok := obj.(*corev1.ConfigMap) + if ok && cfg.Name == util.YurthubConfigMapName { + return true + } + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: m.addConfigmap, + UpdateFunc: m.updateConfigmap, + DeleteFunc: m.deleteConfigmap, + }, }) return m } @@ -139,25 +148,15 @@ func (m *Manager) FindFiltersFor(req *http.Request) []string { } func (m *Manager) addConfigmap(obj interface{}) { - cfg, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } + cfg, _ := obj.(*corev1.ConfigMap) m.updateCacheAgents(cfg.Data[cacheUserAgentsKey], "add") m.updateFilterSettings(cfg.Data, "add") } func (m *Manager) updateConfigmap(oldObj, newObj interface{}) { - oldCfg, ok := oldObj.(*corev1.ConfigMap) - if !ok { - return - } - - newCfg, ok := newObj.(*corev1.ConfigMap) - if !ok { - return - } + oldCfg, _ := oldObj.(*corev1.ConfigMap) + newCfg, _ := newObj.(*corev1.ConfigMap) if oldCfg.Data[cacheUserAgentsKey] != newCfg.Data[cacheUserAgentsKey] { m.updateCacheAgents(newCfg.Data[cacheUserAgentsKey], "update") @@ -169,10 +168,6 @@ func (m *Manager) updateConfigmap(oldObj, newObj interface{}) { } func (m *Manager) deleteConfigmap(obj interface{}) { - _, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } m.updateCacheAgents("", "delete") m.updateFilterSettings(map[string]string{}, "delete") } diff --git a/pkg/yurthub/healthchecker/cloudapiserver/health_checker.go b/pkg/yurthub/healthchecker/cloudapiserver/health_checker.go index 68a3135392a..02e66908cf6 100644 --- a/pkg/yurthub/healthchecker/cloudapiserver/health_checker.go +++ b/pkg/yurthub/healthchecker/cloudapiserver/health_checker.go @@ -112,7 +112,7 @@ func (hc *cloudAPIServerHealthChecker) BackendIsHealthy(server *url.URL) bool { return false } -func (hc *cloudAPIServerHealthChecker) UpdateServers(servers []*url.URL) { +func (hc *cloudAPIServerHealthChecker) UpdateBackends(servers []*url.URL) { // do nothing } diff --git a/pkg/yurthub/healthchecker/cloudapiserver/fake_checker.go b/pkg/yurthub/healthchecker/fake/fake_checker.go similarity index 63% rename from pkg/yurthub/healthchecker/cloudapiserver/fake_checker.go rename to pkg/yurthub/healthchecker/fake/fake_checker.go index 88d6851c89e..3fd2c23fab0 100644 --- a/pkg/yurthub/healthchecker/cloudapiserver/fake_checker.go +++ b/pkg/yurthub/healthchecker/fake/fake_checker.go @@ -14,20 +14,22 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cloudapiserver +package fake import ( "net/url" + "k8s.io/apimachinery/pkg/util/sets" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" ) -type fakeChecker struct { +type FakeChecker struct { servers map[*url.URL]bool } // BackendHealthyStatus returns healthy status of server -func (fc *fakeChecker) BackendIsHealthy(server *url.URL) bool { +func (fc *FakeChecker) BackendIsHealthy(server *url.URL) bool { if server != nil { for s, healthy := range fc.servers { if s.Host == server.Host { @@ -38,7 +40,7 @@ func (fc *fakeChecker) BackendIsHealthy(server *url.URL) bool { return false } -func (fc *fakeChecker) IsHealthy() bool { +func (fc *FakeChecker) IsHealthy() bool { for _, isHealthy := range fc.servers { if isHealthy { return true @@ -47,10 +49,10 @@ func (fc *fakeChecker) IsHealthy() bool { return false } -func (fc *fakeChecker) RenewKubeletLeaseTime() { +func (fc *FakeChecker) RenewKubeletLeaseTime() { } -func (fc *fakeChecker) PickOneHealthyBackend() *url.URL { +func (fc *FakeChecker) PickOneHealthyBackend() *url.URL { for u, isHealthy := range fc.servers { if isHealthy { return u @@ -60,13 +62,27 @@ func (fc *fakeChecker) PickOneHealthyBackend() *url.URL { return nil } -func (fc *fakeChecker) UpdateServers(servers []*url.URL) { - // do nothing +func (fc *FakeChecker) UpdateBackends(servers []*url.URL) { + serverMap := make(map[*url.URL]bool, len(servers)) + for i := range servers { + serverMap[servers[i]] = false + } + + fc.servers = serverMap +} + +func (fc *FakeChecker) ListServerHosts() sets.Set[string] { + hosts := sets.New[string]() + for server := range fc.servers { + hosts.Insert(server.Host) + } + + return hosts } // NewFakeChecker creates a fake checker func NewFakeChecker(servers map[*url.URL]bool) healthchecker.Interface { - return &fakeChecker{ + return &FakeChecker{ servers: servers, } } diff --git a/pkg/yurthub/healthchecker/interfaces.go b/pkg/yurthub/healthchecker/interfaces.go index 83faf431503..8e1ddaa4065 100644 --- a/pkg/yurthub/healthchecker/interfaces.go +++ b/pkg/yurthub/healthchecker/interfaces.go @@ -30,7 +30,7 @@ type Interface interface { IsHealthy() bool BackendIsHealthy(server *url.URL) bool PickOneHealthyBackend() *url.URL - UpdateServers(servers []*url.URL) + UpdateBackends(servers []*url.URL) } // BackendProber is used to send heartbeat to backend and verify backend diff --git a/pkg/yurthub/healthchecker/leaderhub/leader_hub.go b/pkg/yurthub/healthchecker/leaderhub/leader_hub.go index 95e2de5b2c5..3a238c7426f 100644 --- a/pkg/yurthub/healthchecker/leaderhub/leader_hub.go +++ b/pkg/yurthub/healthchecker/leaderhub/leader_hub.go @@ -99,7 +99,7 @@ func (hc *leaderHubHealthChecker) PickOneHealthyBackend() *url.URL { return nil } -func (hc *leaderHubHealthChecker) UpdateServers(servers []*url.URL) { +func (hc *leaderHubHealthChecker) UpdateBackends(servers []*url.URL) { hc.serverMutex.Lock() defer hc.serverMutex.Unlock() newStatus := make(map[string]bool) diff --git a/pkg/yurthub/healthchecker/leaderhub/leader_hub_test.go b/pkg/yurthub/healthchecker/leaderhub/leader_hub_test.go index a1425b3afeb..e5f186454d2 100644 --- a/pkg/yurthub/healthchecker/leaderhub/leader_hub_test.go +++ b/pkg/yurthub/healthchecker/leaderhub/leader_hub_test.go @@ -135,7 +135,7 @@ func TestLeaderHubHealthChecker(t *testing.T) { t.Run(k, func(t *testing.T) { stopCh := make(chan struct{}) hc := NewLeaderHubHealthChecker(2*time.Second, tc.pingFunc, stopCh) - hc.UpdateServers(tc.servers) + hc.UpdateBackends(tc.servers) assert.Equal(t, hc.IsHealthy(), tc.expectedIsHealthy, "IsHealthy result is not equal") assert.Equal(t, hc.PickOneHealthyBackend() != nil, tc.healthyServerFound, "PickOneHealthyBackend result is not equal") @@ -145,7 +145,7 @@ func TestLeaderHubHealthChecker(t *testing.T) { if len(tc.updatedServers) != 0 { time.Sleep(5 * time.Second) - hc.UpdateServers(tc.updatedServers) + hc.UpdateBackends(tc.updatedServers) for u, isHealthy := range tc.expectedBackendIsHealthyAfterUpdated { assert.Equal(t, hc.BackendIsHealthy(u), isHealthy, "BackendIsHealthy result is not equal after updated") } diff --git a/pkg/yurthub/multiplexer/multiplexer.go b/pkg/yurthub/multiplexer/multiplexer.go index 7fbce25cbae..da149b5897a 100644 --- a/pkg/yurthub/multiplexer/multiplexer.go +++ b/pkg/yurthub/multiplexer/multiplexer.go @@ -18,9 +18,14 @@ package multiplexer import ( "fmt" + "maps" + "net/http" + "net/url" + "strings" "sync" "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/fields" @@ -28,85 +33,251 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "github.com/openyurtio/openyurt/cmd/yurthub/app/config" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" + hubutil "github.com/openyurtio/openyurt/pkg/yurthub/util" ) -var KeyFunc = func(obj runtime.Object) (string, error) { - accessor, err := meta.Accessor(obj) - if err != nil { - return "", err - } +const ( + PoolScopeMetadataKey = "pool-scoped-metadata" + LeaderEndpointsKey = "leaders" + EnableLeaderElection = "enable-leader-election" - name := accessor.GetName() - if len(name) == 0 { - return "", apierrors.NewBadRequest("Name parameter required.") - } + PoolSourceForPoolScopeMetadata = "pool" + APIServerSourceForPoolScopeMetadata = "api" +) - ns := accessor.GetNamespace() - if len(ns) == 0 { - return "/" + name, nil - } - return "/" + ns + "/" + name, nil -} +var ( + KeyFunc = func(obj runtime.Object) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", err + } -var AttrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) { - metadata, err := meta.Accessor(obj) - if err != nil { - return nil, nil, err + name := accessor.GetName() + if len(name) == 0 { + return "", apierrors.NewBadRequest("Name parameter required.") + } + + ns := accessor.GetNamespace() + if len(ns) == 0 { + return "/" + name, nil + } + return "/" + ns + "/" + name, nil } - var fieldSet fields.Set - if len(metadata.GetNamespace()) > 0 { - fieldSet = fields.Set{ - "metadata.name": metadata.GetName(), - "metadata.namespace": metadata.GetNamespace(), + AttrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) { + metadata, err := meta.Accessor(obj) + if err != nil { + return nil, nil, err } - } else { - fieldSet = fields.Set{ - "metadata.name": metadata.GetName(), + + var fieldSet fields.Set + if len(metadata.GetNamespace()) > 0 { + fieldSet = fields.Set{ + "metadata.name": metadata.GetName(), + "metadata.namespace": metadata.GetNamespace(), + } + } else { + fieldSet = fields.Set{ + "metadata.name": metadata.GetName(), + } } - } - return labels.Set(metadata.GetLabels()), fieldSet, nil -} + return labels.Set(metadata.GetLabels()), fieldSet, nil + } +) type MultiplexerManager struct { - restStoreProvider ystorage.StorageProvider - restMapper *hubmeta.RESTMapperManager - poolScopeMetadatas sets.Set[string] - - cacheLock sync.RWMutex + restStoreProvider ystorage.StorageProvider + restMapper *hubmeta.RESTMapperManager + healthCheckerForLeaders healthchecker.Interface + loadBalancerForLeaders remote.LoadBalancer + portForLeaderHub int + nodeName string + multiplexerUserAgent string + + sync.RWMutex lazyLoadedGVRCache map[string]Interface lazyLoadedGVRCacheDestroyFunc map[string]func() + sourceForPoolScopeMetadata string + poolScopeMetadata sets.Set[string] + leaderAddresses sets.Set[string] + configMapSynced cache.InformerSynced } func NewRequestMultiplexerManager( + cfg *config.YurtHubConfiguration, restStoreProvider ystorage.StorageProvider, - restMapperMgr *hubmeta.RESTMapperManager, - poolScopeResources []schema.GroupVersionResource) *MultiplexerManager { - - poolScopeMetadatas := sets.New[string]() - for i := range poolScopeResources { - poolScopeMetadatas.Insert(poolScopeResources[i].String()) + healthCheckerForLeaders healthchecker.Interface) *MultiplexerManager { + configmapInformer := cfg.SharedFactory.Core().V1().ConfigMaps().Informer() + poolScopeMetadata := sets.New[string]() + for i := range cfg.PoolScopeResources { + poolScopeMetadata.Insert(cfg.PoolScopeResources[i].String()) } - klog.Infof("pool scope resources: %v", poolScopeMetadatas) + klog.Infof("pool scope resources: %v", poolScopeMetadata) - return &MultiplexerManager{ + m := &MultiplexerManager{ restStoreProvider: restStoreProvider, - restMapper: restMapperMgr, - poolScopeMetadatas: poolScopeMetadatas, + restMapper: cfg.RESTMapperManager, + healthCheckerForLeaders: healthCheckerForLeaders, + loadBalancerForLeaders: cfg.LoadBalancerForLeaderHub, + poolScopeMetadata: poolScopeMetadata, lazyLoadedGVRCache: make(map[string]Interface), lazyLoadedGVRCacheDestroyFunc: make(map[string]func()), - cacheLock: sync.RWMutex{}, + leaderAddresses: sets.New[string](), + portForLeaderHub: cfg.PortForMultiplexer, + nodeName: cfg.NodeName, + multiplexerUserAgent: hubutil.MultiplexerProxyClientUserAgentPrefix + cfg.NodeName, + configMapSynced: configmapInformer.HasSynced, } + + // prepare leader-hub-{pool-name} configmap event handler + leaderHubConfigMapName := fmt.Sprintf("leader-hub-%s", cfg.NodePoolName) + configmapInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + cfg, ok := obj.(*corev1.ConfigMap) + if ok && cfg.Name == leaderHubConfigMapName { + return true + } + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: m.addConfigmap, + UpdateFunc: m.updateConfigmap, + // skip DeleteFunc, because only NodePool deletion will cause to delete this configmap. + }, + }) + return m } -func (m *MultiplexerManager) IsPoolScopeMetadata(gvr *schema.GroupVersionResource) bool { - return m.poolScopeMetadatas.Has(gvr.String()) +func (m *MultiplexerManager) addConfigmap(obj interface{}) { + cm, _ := obj.(*corev1.ConfigMap) + + m.updateLeaderHubConfiguration(cm) + klog.Infof("after added configmap, source for pool scope metadata: %s, pool scope metadata: %v", m.sourceForPoolScopeMetadata, m.poolScopeMetadata) +} + +func (m *MultiplexerManager) updateConfigmap(oldObj, newObj interface{}) { + oldCM, _ := oldObj.(*corev1.ConfigMap) + newCM, _ := newObj.(*corev1.ConfigMap) + + if maps.Equal(oldCM.Data, newCM.Data) { + return + } + + m.updateLeaderHubConfiguration(newCM) + klog.Infof("after updated configmap, source for pool scope metadata: %s, pool scope metadata: %v", m.sourceForPoolScopeMetadata, m.poolScopeMetadata) +} + +func (m *MultiplexerManager) updateLeaderHubConfiguration(cm *corev1.ConfigMap) { + newPoolScopeMetadata := sets.New[string]() + if len(cm.Data[PoolScopeMetadataKey]) != 0 { + for _, part := range strings.Split(cm.Data[PoolScopeMetadataKey], ",") { + subParts := strings.Split(part, "/") + if len(subParts) == 3 { + gvr := schema.GroupVersionResource{ + Group: subParts[0], + Version: subParts[1], + Resource: subParts[2], + } + newPoolScopeMetadata.Insert(gvr.String()) + } + } + } + + newLeaderNames := sets.New[string]() + newLeaderAddresses := sets.New[string]() + if len(cm.Data[LeaderEndpointsKey]) != 0 { + for _, part := range strings.Split(cm.Data[LeaderEndpointsKey], ",") { + subParts := strings.Split(part, "/") + if len(subParts) == 2 { + newLeaderNames.Insert(subParts[0]) + newLeaderAddresses.Insert(subParts[1]) + } + } + } + + newSource := APIServerSourceForPoolScopeMetadata + // enable-leader-election is enabled and node is not elected as leader hub, + // multiplexer will list/watch pool scope metadata from leader yurthub. + // otherwise, multiplexer will list/watch pool scope metadata from cloud kube-apiserver. + if cm.Data[EnableLeaderElection] == "true" && len(newLeaderAddresses) != 0 && !newLeaderNames.Has(m.nodeName) { + newSource = PoolSourceForPoolScopeMetadata + } + + // LeaderHubEndpoints are changed, related health checker and load balancer are need to be updated. + if !m.leaderAddresses.Equal(newLeaderAddresses) { + servers := m.resolveLeaderHubServers(newLeaderAddresses) + m.healthCheckerForLeaders.UpdateBackends(servers) + m.loadBalancerForLeaders.UpdateBackends(servers) + m.leaderAddresses = newLeaderAddresses + } + + if m.sourceForPoolScopeMetadata == newSource && + m.poolScopeMetadata.Equal(newPoolScopeMetadata) { + return + } + + // if pool scope metadata are removed, related GVR cache should be destroyed. + deletedPoolScopeMetadata := m.poolScopeMetadata.Difference(newPoolScopeMetadata) + + m.Lock() + defer m.Unlock() + m.sourceForPoolScopeMetadata = newSource + m.poolScopeMetadata = newPoolScopeMetadata + for _, gvrStr := range deletedPoolScopeMetadata.UnsortedList() { + if destroyFunc, ok := m.lazyLoadedGVRCacheDestroyFunc[gvrStr]; ok { + destroyFunc() + } + delete(m.lazyLoadedGVRCacheDestroyFunc, gvrStr) + delete(m.lazyLoadedGVRCache, gvrStr) + } +} + +func (m *MultiplexerManager) HasSynced() bool { + return m.configMapSynced() +} + +func (m *MultiplexerManager) SourceForPoolScopeMetadata() string { + m.RLock() + defer m.RUnlock() + return m.sourceForPoolScopeMetadata +} + +func (m *MultiplexerManager) IsRequestForPoolScopeMetadata(req *http.Request) bool { + // the requests from multiplexer manager, recongnize it as not multiplexer request. + if req.UserAgent() == m.multiplexerUserAgent { + return false + } + + info, ok := apirequest.RequestInfoFrom(req.Context()) + if !ok { + return false + } + + // list/watch requests + if info.Verb != "list" && info.Verb != "watch" { + return false + } + + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + + m.RLock() + defer m.RUnlock() + return m.poolScopeMetadata.Has(gvr.String()) } func (m *MultiplexerManager) Ready(gvr *schema.GroupVersionResource) bool { @@ -123,10 +294,23 @@ func (m *MultiplexerManager) Ready(gvr *schema.GroupVersionResource) bool { // The cache is loaded in a lazy mode, this means cache will not be loaded when yurthub initializes, // and cache will only be loaded when corresponding request is received. func (m *MultiplexerManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) { - m.cacheLock.Lock() - defer m.cacheLock.Unlock() + // use read lock to get resource cache, so requests can not be blocked. + m.RLock() + rc, exists := m.lazyLoadedGVRCache[gvr.String()] + destroyFunc := m.lazyLoadedGVRCacheDestroyFunc[gvr.String()] + m.RUnlock() + + if exists { + return rc, destroyFunc, nil + } + + // resource cache doesn't exist, initialize multiplexer cache for gvr + m.Lock() + defer m.Unlock() - if rc, ok := m.lazyLoadedGVRCache[gvr.String()]; ok { + // maybe multiple requests are served to initialize multiplexer cache at the same time, + // so we need to check the cache another time before initializing cache. + if rc, exists := m.lazyLoadedGVRCache[gvr.String()]; exists { return rc, m.lazyLoadedGVRCacheDestroyFunc[gvr.String()], nil } @@ -191,3 +375,16 @@ func (m *MultiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind, GetAttrsFunc: AttrsFunc, } } + +func (m *MultiplexerManager) resolveLeaderHubServers(leaderAddresses sets.Set[string]) []*url.URL { + servers := make([]*url.URL, 0, leaderAddresses.Len()) + for _, internalIP := range leaderAddresses.UnsortedList() { + u, err := url.Parse(fmt.Sprintf("https://%s:%d", internalIP, m.portForLeaderHub)) + if err != nil { + klog.Errorf("couldn't parse url(%s), %v", fmt.Sprintf("https://%s:%d", internalIP, m.portForLeaderHub), err) + continue + } + servers = append(servers, u) + } + return servers +} diff --git a/pkg/yurthub/multiplexer/multiplexer_test.go b/pkg/yurthub/multiplexer/multiplexer_test.go index c922063696a..d053017e937 100644 --- a/pkg/yurthub/multiplexer/multiplexer_test.go +++ b/pkg/yurthub/multiplexer/multiplexer_test.go @@ -18,6 +18,10 @@ package multiplexer import ( "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" "os" "testing" "time" @@ -26,11 +30,22 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/endpoints/filters" + "k8s.io/apiserver/pkg/server" kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "github.com/openyurtio/openyurt/cmd/yurthub/app/config" + fakeHealthChecker "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/fake" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" + "github.com/openyurtio/openyurt/pkg/yurthub/transport" ) func newService(namespace, name string) *v1.Service { @@ -69,7 +84,19 @@ func TestShareCacheManager_ResourceCache(t *testing.T) { {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, } - scm := NewRequestMultiplexerManager(dsm, restMapperManager, poolScopeResources) + clientset := fake.NewSimpleClientset() + factory := informers.NewSharedInformerFactory(clientset, 0) + + healthChecher := fakeHealthChecker.NewFakeChecker(map[*url.URL]bool{}) + loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil, context.Background().Done()) + cfg := &config.YurtHubConfiguration{ + PoolScopeResources: poolScopeResources, + RESTMapperManager: restMapperManager, + SharedFactory: factory, + LoadBalancerForLeaderHub: loadBalancer, + } + + scm := NewRequestMultiplexerManager(cfg, dsm, healthChecher) cache, _, _ := scm.ResourceCache(serviceGVR) wait.PollUntilContextCancel(context.Background(), 100*time.Millisecond, true, func(context.Context) (done bool, err error) { if cache.ReadinessCheck() == nil { @@ -87,3 +114,329 @@ func TestShareCacheManager_ResourceCache(t *testing.T) { *newService(metav1.NamespaceSystem, "coredns"), }, serviceList.Items) } + +func TestSyncConfigMap(t *testing.T) { + multiplexerPort := 10269 + poolName := "foo" + nodeName := "node1" + poolScopeResources := []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "services"}, + {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, + } + transportMgr := transport.NewFakeTransportManager(http.StatusOK, map[string]kubernetes.Interface{}) + + testcases := map[string]struct { + addCM *v1.ConfigMap + resultSource string + resultRequest map[string]bool + resultServerHosts sets.Set[string] + updateCM *v1.ConfigMap + updatedResultSource string + updatedResultRequest map[string]bool + updatedResultServerHosts sets.Set[string] + }{ + "no leader hub endpoints": { + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,/v1/nodes", + "enable-leader-election": "true", + }, + }, + resultSource: "api", + resultRequest: map[string]bool{ + "/api/v1/nodes": true, + "/discovery.k8s.io/v1/endpointslices": false, + }, + resultServerHosts: sets.New[string](), + }, + "only one leader hub endpoints": { + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,/v1/nodes", + "enable-leader-election": "true", + "leaders": "test-node1/192.168.1.1", + }, + }, + resultSource: "pool", + resultRequest: map[string]bool{ + "/api/v1/nodes": true, + "/discovery.k8s.io/v1/endpointslices": false, + }, + resultServerHosts: sets.New[string]("192.168.1.1:10269"), + }, + "multiple leader hub endpoints": { + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,/v1/nodes,openyurt.io/v1/bars", + "enable-leader-election": "true", + "leaders": "test-node1/192.168.1.1,test-node2/192.168.1.2", + }, + }, + resultSource: "pool", + resultRequest: map[string]bool{ + "/api/v1/nodes": true, + "/apis/discovery.k8s.io/v1/endpointslices": false, + "/apis/openyurt.io/v1/bars": true, + }, + resultServerHosts: sets.New[string]("192.168.1.1:10269", "192.168.1.2:10269"), + }, + "multiple leader hub endpoints include node": { + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,openyurt.io/v1/foos", + "enable-leader-election": "true", + "leaders": "test-node1/192.168.1.1,node1/192.168.1.2", + }, + }, + resultSource: "api", + resultRequest: map[string]bool{ + "/api/v1/nodes": false, + "/apis/discovery.k8s.io/v1/endpointslices": false, + "/apis/openyurt.io/v1/foos": true, + }, + resultServerHosts: sets.New[string]("192.168.1.1:10269", "192.168.1.2:10269"), + }, + "update enable leader election from true to false": { + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,/v1/nodes,openyurt.io/v1/bars", + "enable-leader-election": "true", + "leaders": "test-node1/192.168.1.1,test-node2/192.168.1.2", + }, + }, + resultSource: "pool", + resultRequest: map[string]bool{ + "/api/v1/nodes": true, + "/apis/discovery.k8s.io/v1/endpointslices": false, + "/apis/openyurt.io/v1/bars": true, + }, + resultServerHosts: sets.New[string]("192.168.1.1:10269", "192.168.1.2:10269"), + updateCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,openyurt.io/v1/foos", + "enable-leader-election": "false", + "leaders": "test-node1/192.168.1.1,test-node2/192.168.1.2", + }, + }, + updatedResultSource: "api", + updatedResultRequest: map[string]bool{ + "/api/v1/nodes": false, + "/apis/discovery.k8s.io/v1/endpointslices": false, + "/apis/openyurt.io/v1/bars": false, + "/apis/openyurt.io/v1/foos": true, + }, + updatedResultServerHosts: sets.New[string]("192.168.1.1:10269", "192.168.1.2:10269"), + }, + "update leader hub endpoints from 2 to 3": { + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,/v1/nodes,openyurt.io/v1/bars", + "enable-leader-election": "true", + "leaders": "test-node1/192.168.1.1,test-node2/192.168.1.2", + }, + }, + resultSource: "pool", + resultRequest: map[string]bool{ + "/api/v1/nodes": true, + "/apis/discovery.k8s.io/v1/endpointslices": false, + "/apis/openyurt.io/v1/bars": true, + }, + resultServerHosts: sets.New[string]("192.168.1.1:10269", "192.168.1.2:10269"), + updateCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,openyurt.io/v1/foos", + "enable-leader-election": "true", + "leaders": "test-node1/192.168.1.1,test-node2/192.168.1.2, test-node3/192.168.1.3", + }, + }, + updatedResultSource: "pool", + updatedResultRequest: map[string]bool{ + "/api/v1/nodes": false, + "/apis/discovery.k8s.io/v1/endpointslices": false, + "/apis/openyurt.io/v1/bars": false, + "/apis/openyurt.io/v1/foos": true, + }, + updatedResultServerHosts: sets.New[string]("192.168.1.1:10269", "192.168.1.2:10269", "192.168.1.3:10269"), + }, + "enable leader election and pool scope metadata are not updated": { + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,/v1/nodes,openyurt.io/v1/bars", + "enable-leader-election": "true", + "leaders": "test-node1/192.168.1.1,test-node2/192.168.1.2", + }, + }, + resultSource: "pool", + resultRequest: map[string]bool{ + "/api/v1/nodes": true, + "/apis/discovery.k8s.io/v1/endpointslices": false, + "/apis/openyurt.io/v1/bars": true, + }, + resultServerHosts: sets.New[string]("192.168.1.1:10269", "192.168.1.2:10269"), + updateCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("leader-hub-%s", poolName), + Namespace: "kube-system", + }, + Data: map[string]string{ + "pool-scoped-metadata": "/v1/services,/v1/nodes,openyurt.io/v1/bars", + "enable-leader-election": "true", + "leaders": "test-node2/192.168.1.2", + }, + }, + updatedResultSource: "pool", + updatedResultRequest: map[string]bool{ + "/api/v1/nodes": true, + "/apis/discovery.k8s.io/v1/endpointslices": false, + "/apis/openyurt.io/v1/bars": true, + "/apis/openyurt.io/v1/foos": false, + }, + updatedResultServerHosts: sets.New[string]("192.168.1.2:10269"), + }, + } + + cfg := &server.Config{ + LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), + } + resolver := server.NewRequestInfoResolver(cfg) + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + stopCh := make(chan struct{}) + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + checker := fakeHealthChecker.NewFakeChecker(map[*url.URL]bool{}) + lb := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, transportMgr, checker, nil, stopCh) + + cfg := &config.YurtHubConfiguration{ + PortForMultiplexer: multiplexerPort, + NodeName: nodeName, + NodePoolName: poolName, + PoolScopeResources: poolScopeResources, + SharedFactory: informerFactory, + LoadBalancerForLeaderHub: lb, + } + m := NewRequestMultiplexerManager(cfg, nil, checker) + + informerFactory.Start(stopCh) + defer close(stopCh) + + if ok := cache.WaitForCacheSync(stopCh, m.HasSynced); !ok { + t.Errorf("configmap is not synced") + return + } + + if tc.addCM != nil { + _, err := client.CoreV1().ConfigMaps("kube-system").Create(context.Background(), tc.addCM, metav1.CreateOptions{}) + if err != nil { + t.Errorf("couldn't create configmap, %v", err) + return + } + + time.Sleep(1 * time.Second) + sourceForPoolScopeMetadata := m.SourceForPoolScopeMetadata() + if tc.resultSource != sourceForPoolScopeMetadata { + t.Errorf("expect sourceForPoolScopeMetadata %s, but got %s", tc.resultSource, sourceForPoolScopeMetadata) + return + } + + for path, isRequestForPoolScopeMetadata := range tc.resultRequest { + req, _ := http.NewRequest("GET", path, nil) + + var capturedRequest *http.Request + handler := filters.WithRequestInfo(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + capturedRequest = r + }), resolver) + recorder := httptest.NewRecorder() + + handler.ServeHTTP(recorder, req) + + if m.IsRequestForPoolScopeMetadata(capturedRequest) != isRequestForPoolScopeMetadata { + t.Errorf("path(%s): expect isRequestForPoolScopeMetadata %v, but got %v", path, isRequestForPoolScopeMetadata, m.IsRequestForPoolScopeMetadata(capturedRequest)) + return + } + } + fakeChecker := checker.(*fakeHealthChecker.FakeChecker) + addedServerHosts := fakeChecker.ListServerHosts() + if !tc.resultServerHosts.Equal(addedServerHosts) { + t.Errorf("expect server hosts %+v, but got %+v", tc.resultServerHosts, addedServerHosts) + } + } + + if tc.updateCM != nil { + _, err := client.CoreV1().ConfigMaps("kube-system").Update(context.Background(), tc.updateCM, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("couldn't update configmap, %v", err) + return + } + + time.Sleep(1 * time.Second) + sourceForPoolScopeMetadata := m.SourceForPoolScopeMetadata() + if tc.updatedResultSource != sourceForPoolScopeMetadata { + t.Errorf("expect sourceForPoolScopeMetadata %s, but got %s", tc.updatedResultSource, sourceForPoolScopeMetadata) + return + } + + for path, isRequestForPoolScopeMetadata := range tc.updatedResultRequest { + req, _ := http.NewRequest("GET", path, nil) + + var updatedCapturedRequest *http.Request + handler := filters.WithRequestInfo(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + updatedCapturedRequest = r + }), resolver) + recorder := httptest.NewRecorder() + + handler.ServeHTTP(recorder, req) + + if m.IsRequestForPoolScopeMetadata(updatedCapturedRequest) != isRequestForPoolScopeMetadata { + t.Errorf("path(%s): expect isRequestForPoolScopeMetadata %v, but got %v", path, isRequestForPoolScopeMetadata, m.IsRequestForPoolScopeMetadata(updatedCapturedRequest)) + return + } + } + fakeChecker := checker.(*fakeHealthChecker.FakeChecker) + updatedServerHosts := fakeChecker.ListServerHosts() + if !tc.updatedResultServerHosts.Equal(updatedServerHosts) { + t.Errorf("expect server hosts %+v, but got %+v", tc.updatedResultServerHosts, updatedServerHosts) + } + } + }) + } +} diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 7330f342506..aa7380d7531 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -36,7 +36,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/manager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/testdata" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/cloudapiserver" + fakeHealthChecker "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/fake" "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate/util" "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" @@ -116,7 +116,7 @@ func TestHealthyCheck(t *testing.T) { } u, _ := url.Parse("https://10.10.10.113:6443") remoteServers := []*url.URL{u} - fakeHealthchecker := cloudapiserver.NewFakeChecker(servers) + fakeHealthchecker := fakeHealthChecker.NewFakeChecker(servers) client, err := testdata.CreateCertFakeClient("../certificate/testdata") if err != nil { diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go index 7c64bfa49bd..df4e5bab042 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go @@ -19,9 +19,11 @@ package multiplexer import ( "bytes" + "context" "fmt" "net/http" "net/http/httptest" + "net/url" "os" "sort" "testing" @@ -36,14 +38,19 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/filter" + fakeHealthChecker "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/fake" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" multiplexerstorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" ctesting "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer/testing" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" ) var ( @@ -111,6 +118,9 @@ func TestShareProxy_ServeHTTP_LIST(t *testing.T) { } restMapperManager, _ := meta.NewRESTMapperManager(tmpDir) + clientset := fake.NewSimpleClientset() + factory := informers.NewSharedInformerFactory(clientset, 0) + poolScopeResources := []schema.GroupVersionResource{ {Group: "", Version: "v1", Resource: "services"}, {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, @@ -150,8 +160,16 @@ func TestShareProxy_ServeHTTP_LIST(t *testing.T) { Body: &bytes.Buffer{}, } + healthChecher := fakeHealthChecker.NewFakeChecker(map[*url.URL]bool{}) + loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil, context.Background().Done()) dsm := multiplexerstorage.NewDummyStorageManager(mockCacheMap()) - rmm := multiplexer.NewRequestMultiplexerManager(dsm, restMapperManager, poolScopeResources) + cfg := &config.YurtHubConfiguration{ + PoolScopeResources: poolScopeResources, + RESTMapperManager: restMapperManager, + SharedFactory: factory, + LoadBalancerForLeaderHub: loadBalancer, + } + rmm := multiplexer.NewRequestMultiplexerManager(cfg, dsm, healthChecher) informerSynced := func() bool { return rmm.Ready(&schema.GroupVersionResource{ @@ -293,6 +311,9 @@ func TestShareProxy_ServeHTTP_WATCH(t *testing.T) { {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, } + clientset := fake.NewSimpleClientset() + factory := informers.NewSharedInformerFactory(clientset, 0) + for k, tc := range map[string]struct { filterFinder filter.FilterFinder url string @@ -315,8 +336,17 @@ func TestShareProxy_ServeHTTP_WATCH(t *testing.T) { }, } { t.Run(k, func(t *testing.T) { + healthChecher := fakeHealthChecker.NewFakeChecker(map[*url.URL]bool{}) + loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil, context.Background().Done()) + dsm := multiplexerstorage.NewDummyStorageManager(mockCacheMap()) - rmm := multiplexer.NewRequestMultiplexerManager(dsm, restMapperManager, poolScopeResources) + cfg := &config.YurtHubConfiguration{ + PoolScopeResources: poolScopeResources, + RESTMapperManager: restMapperManager, + SharedFactory: factory, + LoadBalancerForLeaderHub: loadBalancer, + } + rmm := multiplexer.NewRequestMultiplexerManager(cfg, dsm, healthChecher) informerSynced := func() bool { return rmm.Ready(&schema.GroupVersionResource{ diff --git a/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go b/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go index 9bc038e69f5..5e9c0129d5c 100644 --- a/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go +++ b/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go @@ -43,7 +43,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/manager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/testdata" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/cloudapiserver" + fakeHealthChecker "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/fake" "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" "github.com/openyurtio/openyurt/pkg/yurthub/transport" @@ -67,7 +67,7 @@ func TestLocalCacheHandler(t *testing.T) { servers := map[*url.URL]bool{ {Host: "10.10.10.113:6443"}: false, } - fakeHealthChecker := cloudapiserver.NewFakeChecker(servers) + fakeHealthChecker := fakeHealthChecker.NewFakeChecker(servers) u, _ := url.Parse("https://10.10.10.113:6443") remoteServers := []*url.URL{u} diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index c5fb1c3e950..7eee8c42a79 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -45,17 +45,18 @@ import ( ) type yurtReverseProxy struct { - cfg *config.YurtHubConfiguration - cloudHealthChecker healthchecker.Interface - resolver apirequest.RequestInfoResolver - loadBalancer http.Handler - localProxy http.Handler - autonomyProxy http.Handler - multiplexerProxy http.Handler - multiplexerManager *basemultiplexer.MultiplexerManager - tenantMgr tenant.Interface - nodeName string - multiplexerUserAgent string + cfg *config.YurtHubConfiguration + cloudHealthChecker healthchecker.Interface + resolver apirequest.RequestInfoResolver + loadBalancer remote.LoadBalancer + loadBalancerForLeaderHub remote.LoadBalancer + localProxy http.Handler + autonomyProxy http.Handler + multiplexerProxy http.Handler + multiplexerManager *basemultiplexer.MultiplexerManager + tenantMgr tenant.Interface + nodeName string + multiplexerUserAgent string } // NewYurtReverseProxyHandler creates a http handler for proxying @@ -64,6 +65,7 @@ func NewYurtReverseProxyHandler( yurtHubCfg *config.YurtHubConfiguration, localCacheMgr cachemanager.CacheManager, cloudHealthChecker healthchecker.Interface, + requestMultiplexerManager *basemultiplexer.MultiplexerManager, stopCh <-chan struct{}) (http.Handler, error) { cfg := &server.Config{ LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), @@ -97,22 +99,23 @@ func NewYurtReverseProxyHandler( } multiplexerProxy := multiplexer.NewMultiplexerProxy(yurtHubCfg.FilterFinder, - yurtHubCfg.RequestMultiplexerManager, + requestMultiplexerManager, yurtHubCfg.RESTMapperManager, stopCh) yurtProxy := &yurtReverseProxy{ - cfg: yurtHubCfg, - resolver: resolver, - loadBalancer: lb, - cloudHealthChecker: cloudHealthChecker, - localProxy: localProxy, - autonomyProxy: autonomyProxy, - multiplexerProxy: multiplexerProxy, - multiplexerManager: yurtHubCfg.RequestMultiplexerManager, - tenantMgr: yurtHubCfg.TenantManager, - nodeName: yurtHubCfg.NodeName, - multiplexerUserAgent: hubutil.MultiplexerProxyClientUserAgentPrefix + yurtHubCfg.NodeName, + cfg: yurtHubCfg, + resolver: resolver, + loadBalancer: lb, + loadBalancerForLeaderHub: yurtHubCfg.LoadBalancerForLeaderHub, + cloudHealthChecker: cloudHealthChecker, + localProxy: localProxy, + autonomyProxy: autonomyProxy, + multiplexerProxy: multiplexerProxy, + multiplexerManager: requestMultiplexerManager, + tenantMgr: yurtHubCfg.TenantManager, + nodeName: yurtHubCfg.NodeName, + multiplexerUserAgent: hubutil.MultiplexerProxyClientUserAgentPrefix + yurtHubCfg.NodeName, } // warp non resource proxy handler @@ -131,7 +134,7 @@ func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler } handler = util.WithRequestClientComponent(handler) handler = util.WithPartialObjectMetadataRequest(handler) - handler = util.WithIsRequestForPoolScopeMetadata(handler, p.multiplexerManager, p.multiplexerUserAgent) + handler = util.WithIsRequestForPoolScopeMetadata(handler, p.multiplexerManager.IsRequestForPoolScopeMetadata) if !yurtutil.IsNil(p.tenantMgr) && p.tenantMgr.GetTenantNs() != "" { handler = util.WithSaTokenSubstitute(handler, p.tenantMgr) @@ -145,8 +148,8 @@ func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler } func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - // allow requests from yurthub itself because yurthub need to get resource from cloud kube-apiserver for initializing, - // and reject all requests from outside of yurthub when yurthub is not ready. + // reject all requests from outside of yurthub when yurthub is not ready. + // and allow requests from yurthub itself because yurthub need to get resource from cloud kube-apiserver for initializing. if !p.IsRequestFromHubSelf(req) { if err := config.ReadinessCheck(p.cfg); err != nil { klog.Errorf("could not handle request(%s) because hub is not ready for %s", hubutil.ReqString(req), err.Error()) @@ -162,14 +165,24 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) return } - // requests should be forwarded to cloud kube-apiserver for cloud mode(cloudHealthChecker==nil) - if yurtutil.IsNil(p.cloudHealthChecker) { - p.loadBalancer.ServeHTTP(rw, req) - return - } - - // handle requests for edge mode switch { + case yurtutil.IsNil(p.localProxy): // cloud mode + // requests from local multiplexer of yurthub and the source of pool scope metadata is leader hub + if p.IsMultiplexerRequestFromHubSelft(req) && + p.multiplexerManager.SourceForPoolScopeMetadata() == basemultiplexer.PoolSourceForPoolScopeMetadata { + // list/watch pool scope metadata from leader yurthub + if backend := p.loadBalancerForLeaderHub.PickOne(); !yurtutil.IsNil(backend) { + backend.ServeHTTP(rw, req) + return + } + } + + if backend := p.loadBalancer.PickOne(); !yurtutil.IsNil(backend) { + backend.ServeHTTP(rw, req) + } else { + klog.Errorf("no healthy backend avialbale for request %s", hubutil.ReqString(req)) + http.Error(rw, "no healthy backends available.", http.StatusBadGateway) + } case util.IsKubeletLeaseReq(req): p.handleKubeletLease(rw, req) case util.IsKubeletGetNodeReq(req): @@ -178,10 +191,22 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) p.eventHandler(rw, req) case util.IsSubjectAccessReviewCreateGetRequest(req): p.subjectAccessReviewHandler(rw, req) + case p.IsMultiplexerRequestFromHubSelft(req): + // requests from multiplexer of local yurthub should be forwarded to cloud kube-apiserver or leader yurthub + // depends on leader election information. + if p.multiplexerManager.SourceForPoolScopeMetadata() == basemultiplexer.PoolSourceForPoolScopeMetadata { + // list/watch pool scope metadata from leader yurthub + if backend := p.loadBalancerForLeaderHub.PickOne(); !yurtutil.IsNil(backend) { + backend.ServeHTTP(rw, req) + return + } + } + // otherwise, list/watch pool scope metadata from cloud kube-apiserver or local cache, so fall through + fallthrough default: // handling the request with cloud apiserver or local cache. - if p.cloudHealthChecker.IsHealthy() { - p.loadBalancer.ServeHTTP(rw, req) + if backend := p.loadBalancer.PickOne(); !yurtutil.IsNil(backend) { + backend.ServeHTTP(rw, req) } else { p.localProxy.ServeHTTP(rw, req) } @@ -197,16 +222,16 @@ func (p *yurtReverseProxy) handleKubeletLease(rw http.ResponseWriter, req *http. } func (p *yurtReverseProxy) eventHandler(rw http.ResponseWriter, req *http.Request) { - if p.cloudHealthChecker.IsHealthy() { - p.loadBalancer.ServeHTTP(rw, req) + if backend := p.loadBalancer.PickOne(); !yurtutil.IsNil(backend) { + backend.ServeHTTP(rw, req) } else { p.localProxy.ServeHTTP(rw, req) } } func (p *yurtReverseProxy) subjectAccessReviewHandler(rw http.ResponseWriter, req *http.Request) { - if p.cloudHealthChecker.IsHealthy() { - p.loadBalancer.ServeHTTP(rw, req) + if backend := p.loadBalancer.PickOne(); !yurtutil.IsNil(backend) { + backend.ServeHTTP(rw, req) } else { err := errors.New("request is from cloud APIServer but it's currently not healthy") klog.Errorf("could not handle SubjectAccessReview req %s, %v", hubutil.ReqString(req), err) @@ -217,13 +242,22 @@ func (p *yurtReverseProxy) subjectAccessReviewHandler(rw http.ResponseWriter, re func (p *yurtReverseProxy) IsRequestFromHubSelf(req *http.Request) bool { userAgent := req.UserAgent() - if userAgent == p.multiplexerUserAgent { - // requests from multiplexer manager in yurthub + // yurthub emits the following two kinds of requests + // 1. requests with User-Agent=multiplexe-proxy-{nodeName} from multiplexer manager in yurthub + // 2. requests with User-Agent=projectinfo.GetHubName() from sharedInformer for filter and configuration manager in yurthub + if userAgent == p.multiplexerUserAgent || strings.HasPrefix(userAgent, projectinfo.GetHubName()) { return true - } else if strings.HasPrefix(userAgent, projectinfo.GetHubName()) { - // requests from sharedInformer for filter and configuration manager in yurthub + } + + return false +} + +func (p *yurtReverseProxy) IsMultiplexerRequestFromHubSelft(req *http.Request) bool { + userAgent := req.UserAgent() + + if userAgent == p.multiplexerUserAgent { return true - } else { - return false } + + return false } diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index cdbcc50120e..426539ae71f 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -34,7 +34,6 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" - "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" "github.com/openyurtio/openyurt/pkg/yurthub/transport" hubutil "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -162,8 +161,8 @@ func (prio *PriorityStrategy) PickOne() Backend { // LoadBalancer is an interface for proxying http request to remote server // based on the load balance mode(round-robin or priority) type LoadBalancer interface { - ServeHTTP(rw http.ResponseWriter, req *http.Request) UpdateBackends(remoteServers []*url.URL) + PickOne() Backend CurrentStrategy() LoadBalancingStrategy } @@ -205,7 +204,7 @@ func NewLoadBalancer( func (lb *loadBalancer) UpdateBackends(remoteServers []*url.URL) { newBackends := make([]Backend, 0, len(remoteServers)) for _, server := range remoteServers { - proxy, err := util.NewRemoteProxy(server, lb.modifyResponse, lb.errorHandler, lb.transportMgr, lb.stopCh) + proxy, err := NewRemoteProxy(server, lb.modifyResponse, lb.errorHandler, lb.transportMgr, lb.stopCh) if err != nil { klog.Errorf("could not create proxy for backend %s, %v", server.String(), err) continue @@ -225,21 +224,12 @@ func (lb *loadBalancer) UpdateBackends(remoteServers []*url.URL) { lb.strategy.UpdateBackends(newBackends) } -func (lb *loadBalancer) CurrentStrategy() LoadBalancingStrategy { - return lb.strategy +func (lb *loadBalancer) PickOne() Backend { + return lb.strategy.PickOne() } -// ServeHTTP forwards the request to a backend. -func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - rp := lb.strategy.PickOne() - if rp == nil { - klog.Errorf("no healthy backend avialbale for request %s", hubutil.ReqString(req)) - http.Error(rw, "no healthy backends available.", http.StatusBadGateway) - return - } - klog.V(3).Infof("forwarding request %s to backend %s", hubutil.ReqString(req), rp.Name()) - - rp.ServeHTTP(rw, req) +func (lb *loadBalancer) CurrentStrategy() LoadBalancingStrategy { + return lb.strategy } // errorHandler handles errors and tries to serve from local cache. @@ -297,23 +287,25 @@ func (lb *loadBalancer) modifyResponse(resp *http.Response) error { req = req.WithContext(ctx) // filter response data - if responseFilter, ok := lb.filterFinder.FindResponseFilter(req); ok { - wrapBody, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "filter") - size, filterRc, err := responseFilter.Filter(req, wrapBody, lb.stopCh) - if err != nil { - klog.Errorf("could not filter response for %s, %v", hubutil.ReqString(req), err) - return err - } - resp.Body = filterRc - if size > 0 { - resp.ContentLength = int64(size) - resp.Header.Set(yurtutil.HttpHeaderContentLength, fmt.Sprint(size)) - } + if !yurtutil.IsNil(lb.filterFinder) { + if responseFilter, ok := lb.filterFinder.FindResponseFilter(req); ok { + wrapBody, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "filter") + size, filterRc, err := responseFilter.Filter(req, wrapBody, lb.stopCh) + if err != nil { + klog.Errorf("could not filter response for %s, %v", hubutil.ReqString(req), err) + return err + } + resp.Body = filterRc + if size > 0 { + resp.ContentLength = int64(size) + resp.Header.Set(yurtutil.HttpHeaderContentLength, fmt.Sprint(size)) + } - // after gunzip in filter, the header content encoding should be removed. - // because there's no need to gunzip response.body again. - if needUncompressed { - resp.Header.Del("Content-Encoding") + // after gunzip in filter, the header content encoding should be removed. + // because there's no need to gunzip response.body again. + if needUncompressed { + resp.Header.Del("Content-Encoding") + } } } diff --git a/pkg/yurthub/proxy/remote/loadbalancer_test.go b/pkg/yurthub/proxy/remote/loadbalancer_test.go index bf8fc3339b1..0e3bd402613 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer_test.go +++ b/pkg/yurthub/proxy/remote/loadbalancer_test.go @@ -26,7 +26,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/cloudapiserver" + fakeHealthChecker "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/fake" "github.com/openyurtio/openyurt/pkg/yurthub/transport" ) @@ -159,7 +159,7 @@ func TestLoadBalancingStrategy(t *testing.T) { for k, tc := range testcases { t.Run(k, func(t *testing.T) { - checker := cloudapiserver.NewFakeChecker(tc.servers) + checker := fakeHealthChecker.NewFakeChecker(tc.servers) servers := make([]*url.URL, 0, len(tc.servers)) for server := range tc.servers { servers = append(servers, server) diff --git a/pkg/yurthub/proxy/util/remote.go b/pkg/yurthub/proxy/remote/remote.go similarity index 99% rename from pkg/yurthub/proxy/util/remote.go rename to pkg/yurthub/proxy/remote/remote.go index ff30b688cd0..62919f31502 100644 --- a/pkg/yurthub/proxy/util/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package remote import ( "fmt" diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index a956993b900..067ffaaa561 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -37,7 +37,6 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/metrics" - "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" "github.com/openyurtio/openyurt/pkg/yurthub/tenant" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -56,10 +55,10 @@ var needModifyTimeoutVerb = map[string]bool{ // WithIsRequestForPoolScopeMetadata add a mark in context for specifying whether a request is used for list/watching pool scope metadata or not, // request for pool scope metadata will be handled by multiplexer manager instead forwarding to cloud kube-apiserver. -func WithIsRequestForPoolScopeMetadata(handler http.Handler, multiplexerManager *multiplexer.MultiplexerManager, multiplexerUserAgent string) http.Handler { +func WithIsRequestForPoolScopeMetadata(handler http.Handler, isRequestForPoolScopeMetadata func(req *http.Request) bool) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() - if isMultiplexerRequest(req, multiplexerManager, multiplexerUserAgent) { + if isRequestForPoolScopeMetadata(req) { ctx = util.WithIsRequestForPoolScopeMetadata(ctx, true) } else { ctx = util.WithIsRequestForPoolScopeMetadata(ctx, false) @@ -69,29 +68,6 @@ func WithIsRequestForPoolScopeMetadata(handler http.Handler, multiplexerManager }) } -func isMultiplexerRequest(req *http.Request, multiplexerManager *multiplexer.MultiplexerManager, multiplexerUserAgent string) bool { - // the requests from multiplexer manager, recongnize it as not multiplexer request. - if req.UserAgent() == multiplexerUserAgent { - return false - } - - info, ok := apirequest.RequestInfoFrom(req.Context()) - if !ok { - return false - } - - // list/watch requests - if info.Verb != "list" && info.Verb != "watch" { - return false - } - - return multiplexerManager.IsPoolScopeMetadata(&schema.GroupVersionResource{ - Group: info.APIGroup, - Version: info.APIVersion, - Resource: info.Resource, - }) -} - // WithPartialObjectMetadataRequest is used for extracting info for partial object metadata request, // then these info is used by cache manager. func WithPartialObjectMetadataRequest(handler http.Handler) http.Handler { diff --git a/pkg/yurthub/proxy/util/util_test.go b/pkg/yurthub/proxy/util/util_test.go index 360135351b1..ed540f34e06 100644 --- a/pkg/yurthub/proxy/util/util_test.go +++ b/pkg/yurthub/proxy/util/util_test.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "os" "reflect" "testing" @@ -33,10 +34,15 @@ import ( "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/endpoints/request" kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "github.com/openyurtio/openyurt/cmd/yurthub/app/config" + fakeHealthChecker "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/fake" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" "github.com/openyurtio/openyurt/pkg/yurthub/tenant" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -78,6 +84,9 @@ func TestWithIsRequestForPoolScopeMetadata(t *testing.T) { resolver := newTestRequestInfoResolver() + clientset := fake.NewSimpleClientset() + factory := informers.NewSharedInformerFactory(clientset, 0) + for k, tc := range testcases { t.Run(k, func(t *testing.T) { req, _ := http.NewRequest(tc.verb, tc.path, nil) @@ -102,7 +111,15 @@ func TestWithIsRequestForPoolScopeMetadata(t *testing.T) { {Group: "discovery.k8s.io", Version: "v1", Resource: "endpointslices"}, } - rmm := multiplexer.NewRequestMultiplexerManager(dsm, restMapperManager, poolScopeResources) + healthChecher := fakeHealthChecker.NewFakeChecker(map[*url.URL]bool{}) + loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil, context.Background().Done()) + cfg := &config.YurtHubConfiguration{ + PoolScopeResources: poolScopeResources, + RESTMapperManager: restMapperManager, + SharedFactory: factory, + LoadBalancerForLeaderHub: loadBalancer, + } + rmm := multiplexer.NewRequestMultiplexerManager(cfg, dsm, healthChecher) var isRequestForPoolScopeMetadata bool var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -110,7 +127,7 @@ func TestWithIsRequestForPoolScopeMetadata(t *testing.T) { isRequestForPoolScopeMetadata, _ = util.IsRequestForPoolScopeMetadataFrom(ctx) }) - handler = WithIsRequestForPoolScopeMetadata(handler, rmm, "test-agent") + handler = WithIsRequestForPoolScopeMetadata(handler, rmm.IsRequestForPoolScopeMetadata) handler = filters.WithRequestInfo(handler, resolver) handler.ServeHTTP(httptest.NewRecorder(), req) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 07f7d634fba..07441c40ecf 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -69,6 +69,12 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration, return err } } + + if cfg.YurtHubMultiplexerServerServing != nil { + if _, _, err := cfg.YurtHubMultiplexerServerServing.Serve(proxyHandler, 0, stopCh); err != nil { + return err + } + } return nil } diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 2d3eefd01cc..7f053dd7433 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -79,6 +79,7 @@ const ( YurtHubProxyPort = 10261 YurtHubPort = 10267 YurtHubProxySecurePort = 10268 + YurtHubMultiplexerPort = 10269 ) var ( diff --git a/test/e2e/cmd/init/converter.go b/test/e2e/cmd/init/converter.go index afca3c1ba10..ff4e3167d3e 100644 --- a/test/e2e/cmd/init/converter.go +++ b/test/e2e/cmd/init/converter.go @@ -22,6 +22,7 @@ import ( "os" "os/exec" "path/filepath" + "reflect" "strconv" "strings" "time" @@ -35,8 +36,11 @@ import ( kubeclientset "k8s.io/client-go/kubernetes" bootstrapapi "k8s.io/cluster-bootstrap/token/api" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + appsv1beta2 "github.com/openyurtio/openyurt/pkg/apis/apps/v1beta2" nodeservant "github.com/openyurtio/openyurt/pkg/node-servant" + "github.com/openyurtio/openyurt/pkg/projectinfo" kubeadmapi "github.com/openyurtio/openyurt/pkg/util/kubernetes/kubeadm/app/phases/bootstraptoken/clusterinfo" strutil "github.com/openyurtio/openyurt/pkg/util/strings" "github.com/openyurtio/openyurt/test/e2e/cmd/init/lock" @@ -53,6 +57,7 @@ const ( type ClusterConverter struct { RootDir string ClientSet kubeclientset.Interface + RuntimeClient client.Client CloudNodes []string EdgeNodes []string WaitServantJobTimeout time.Duration @@ -85,6 +90,48 @@ func (c *ClusterConverter) Run() error { return err } + klog.Infof("Start to initialize node pools and label nodes: %+v", DefaultPools) + for name, leaderInfo := range DefaultPools { + np := &appsv1beta2.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: appsv1beta2.NodePoolSpec{ + Type: leaderInfo.Kind, + EnableLeaderElection: leaderInfo.EnableLeaderElection, + LeaderReplicas: int32(leaderInfo.LeaderReplicas), + InterConnectivity: true, + }, + } + if err := c.RuntimeClient.Create(context.Background(), np); err != nil { + klog.Errorf("failed to create nodepool %s, %v", name, err) + return err + } + } + + for nodeName, poolName := range NodeNameToPool { + node := &corev1.Node{} + if err := c.RuntimeClient.Get(context.Background(), client.ObjectKey{Name: nodeName}, node); err != nil { + if apierrors.IsNotFound(err) { + continue + } + return err + } + + newNode := node.DeepCopy() + nodeLabels := newNode.Labels + if nodeLabels == nil { + nodeLabels = map[string]string{} + } + + nodeLabels[projectinfo.GetNodePoolLabel()] = poolName + if !reflect.DeepEqual(newNode, node) { + if err := c.RuntimeClient.Patch(context.Background(), newNode, client.MergeFrom(node)); err != nil { + return err + } + } + } + klog.Info( "Running jobs for convert. Job running may take a long time, and job failure will not affect the execution of the next stage", ) @@ -132,7 +179,7 @@ func (c *ClusterConverter) installYurthubByHelm() error { "--namespace", "kube-system", "--set", - fmt.Sprintf("kubernetesServerAddr=KUBERNETES_SERVER_ADDRESS,image.tag=%s", tag), + fmt.Sprintf("kubernetesServerAddr=KUBERNETES_SERVER_ADDRESS,nodePoolName=NODE_POOL_NAME,image.tag=%s", tag), ) output, err := cmd.CombinedOutput() if err != nil { @@ -156,6 +203,7 @@ func (c *ClusterConverter) installYurthubByHelm() error { if len(c.EdgeNodes) != 0 { convertCtx["configmap_name"] = yssYurtHubName if err = kubeutil.RunServantJobs(c.ClientSet, c.WaitServantJobTimeout, func(nodeName string) (*batchv1.Job, error) { + convertCtx["nodePoolName"] = NodeNameToPool[nodeName] return nodeservant.RenderNodeServantJob("convert", convertCtx, nodeName) }, c.EdgeNodes, os.Stderr); err != nil { // print logs of yurthub @@ -176,6 +224,7 @@ func (c *ClusterConverter) installYurthubByHelm() error { convertCtx["configmap_name"] = yssYurtHubCloudName klog.Infof("convert context for cloud nodes(%q): %#+v", c.CloudNodes, convertCtx) if err = kubeutil.RunServantJobs(c.ClientSet, c.WaitServantJobTimeout, func(nodeName string) (*batchv1.Job, error) { + convertCtx["nodePoolName"] = NodeNameToPool[nodeName] return nodeservant.RenderNodeServantJob("convert", convertCtx, nodeName) }, c.CloudNodes, os.Stderr); err != nil { return err diff --git a/test/e2e/cmd/init/init.go b/test/e2e/cmd/init/init.go index 2465f514669..96071895927 100644 --- a/test/e2e/cmd/init/init.go +++ b/test/e2e/cmd/init/init.go @@ -31,13 +31,21 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" kubectllogs "k8s.io/kubectl/pkg/cmd/logs" + "sigs.k8s.io/controller-runtime/pkg/client" + appsv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1" + appsv1beta1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1beta1" + appsv1beta2 "github.com/openyurtio/openyurt/pkg/apis/apps/v1beta2" + iotv1alpha2 "github.com/openyurtio/openyurt/pkg/apis/iot/v1alpha2" + iotv1beta1 "github.com/openyurtio/openyurt/pkg/apis/iot/v1beta1" "github.com/openyurtio/openyurt/pkg/projectinfo" strutil "github.com/openyurtio/openyurt/pkg/util/strings" tmplutil "github.com/openyurtio/openyurt/pkg/util/templates" @@ -115,8 +123,43 @@ var ( yurtManagerImageFormat = "openyurt/yurt-manager:%s" nodeServantImageFormat = "openyurt/node-servant:%s" yurtIotDockImageFormat = "openyurt/yurt-iot-dock:%s" + + NodeNameToPool = map[string]string{ + "openyurt-e2e-test-control-plane": "yurt-pool1", + "openyurt-e2e-test-worker": "yurt-pool2", + "openyurt-e2e-test-worker2": "yurt-pool2", + "openyurt-e2e-test-worker3": "yurt-pool3", + "openyurt-e2e-test-worker4": "yurt-pool3", + } + DefaultPools = map[string]struct { + Kind appsv1beta2.NodePoolType + EnableLeaderElection bool + LeaderReplicas int + }{ + "yurt-pool1": { + Kind: appsv1beta2.Cloud, + EnableLeaderElection: false, + }, + "yurt-pool2": { + Kind: appsv1beta2.Edge, + EnableLeaderElection: true, + LeaderReplicas: 1, + }, + "yurt-pool3": { + Kind: appsv1beta2.Edge, + EnableLeaderElection: false, + }, + } ) +func init() { + utilruntime.Must(appsv1alpha1.AddToScheme(scheme.Scheme)) + utilruntime.Must(appsv1beta1.AddToScheme(scheme.Scheme)) + utilruntime.Must(appsv1beta2.AddToScheme(scheme.Scheme)) + utilruntime.Must(iotv1alpha2.AddToScheme(scheme.Scheme)) + utilruntime.Must(iotv1beta1.AddToScheme(scheme.Scheme)) +} + func NewInitCMD(out io.Writer) *cobra.Command { o := newKindOptions() @@ -279,6 +322,7 @@ type Initializer struct { out io.Writer operator *KindOperator kubeClient kubeclientset.Interface + runtimeClient client.Client componentsBuilder *kubeutil.Builder } @@ -323,6 +367,11 @@ func (ki *Initializer) Run() error { return err } + ki.runtimeClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return err + } + // if default cni is not installed, install flannel instead. if ki.DisableDefaultCNI { klog.Info("Start to install flannel in order to make all nodes ready") @@ -686,6 +735,7 @@ func (ki *Initializer) deployOpenYurt() error { converter := &ClusterConverter{ RootDir: dir, ClientSet: ki.kubeClient, + RuntimeClient: ki.runtimeClient, CloudNodes: ki.CloudNodes, EdgeNodes: ki.EdgeNodes, WaitServantJobTimeout: kubeutil.DefaultWaitServantJobTimeout, diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 0ad04911a6e..baa63673540 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -74,8 +74,8 @@ var _ = ginkgo.BeforeSuite(func() { _, err = ns.CreateNameSpace(c, constants.YurtE2ENamespaceName) gomega.Expect(err).NotTo(gomega.HaveOccurred(), "fail to create namespace") - err = util.PrepareNodePoolWithNode(context.TODO(), yurtconfig.YurtE2eCfg.RuntimeClient, "openyurt-e2e-test-worker") - gomega.Expect(err).NotTo(gomega.HaveOccurred(), "fail to create a nodepool with node") + // err = util.PrepareNodePoolWithNode(context.TODO(), yurtconfig.YurtE2eCfg.RuntimeClient, "openyurt-e2e-test-worker") + // gomega.Expect(err).NotTo(gomega.HaveOccurred(), "fail to create a nodepool with node") if labelFilter([]string{"edge-autonomy"}) { // get nginx podIP on edge node worker2 diff --git a/test/e2e/util/nodepool.go b/test/e2e/util/nodepool.go index 2b77ebe75bb..85c8ae28c30 100644 --- a/test/e2e/util/nodepool.go +++ b/test/e2e/util/nodepool.go @@ -123,7 +123,7 @@ func InitTestNodePool( } const ( - NodePoolName = "nodepool-with-node" + NodePoolName = "yurt-pool2" ) // PrepareNodePoolWithNode will create a edge nodepool named "nodepool-with-node" and add the "openyurt-e2e-test-worker" node to this nodepool. diff --git a/test/e2e/yurt/hubleader.go b/test/e2e/yurt/hubleader.go index a23c24e82e6..92e32b8d24f 100644 --- a/test/e2e/yurt/hubleader.go +++ b/test/e2e/yurt/hubleader.go @@ -45,7 +45,7 @@ import ( // conflicts. This is intentional to avoid creating more nodes than necessary in Kind cluster. var _ = Describe("Test hubleader elections", Serial, func() { ctx := context.Background() - nodePoolName := "hubleadere2e" + nodePoolName := "yurt-pool3" var k8sClient client.Client var pools util.TestNodePool @@ -153,11 +153,11 @@ var _ = Describe("Test hubleader elections", Serial, func() { Nodes: sets.New("openyurt-e2e-test-worker3", "openyurt-e2e-test-worker4"), } - Eventually( - func() error { - return util.InitTestNodePool(ctx, k8sClient, pools) - }, - time.Second*30, time.Millisecond*500).Should(BeNil()) + // Eventually( + // func() error { + // return util.InitTestNodePool(ctx, k8sClient, pools) + // }, + // time.Second*30, time.Millisecond*500).Should(BeNil()) }) AfterEach(func() { diff --git a/test/e2e/yurt/nodepool.go b/test/e2e/yurt/nodepool.go index 6a1d8f558b0..b7f586bf82e 100644 --- a/test/e2e/yurt/nodepool.go +++ b/test/e2e/yurt/nodepool.go @@ -98,24 +98,24 @@ var _ = Describe("nodepool test", func() { It("Test NodePool create not empty", func() { By("Run nodepool create with worker 2") // worker 1 is already mapped to a pool - npName := fmt.Sprintf("test-%d", time.Now().Unix()) + // npName := fmt.Sprintf("test-%d", time.Now().Unix()) pool := util.TestNodePool{ NodePool: v1beta2.NodePool{ ObjectMeta: metav1.ObjectMeta{ - Name: npName, + Name: "yurt-pool2", }, Spec: v1beta2.NodePoolSpec{ Type: v1beta2.Edge, }, }, - Nodes: sets.New("openyurt-e2e-test-worker2"), // we will use this worker in the nodepool + Nodes: sets.New("openyurt-e2e-test-worker", "openyurt-e2e-test-worker2"), // we will use this worker in the nodepool } - Eventually( - func() error { - return util.InitTestNodePool(ctx, k8sClient, pool) - }, - time.Second*5, time.Millisecond*500).Should(BeNil()) + // Eventually( + // func() error { + // return util.InitTestNodePool(ctx, k8sClient, pool) + // }, + // time.Second*5, time.Millisecond*500).Should(BeNil()) Eventually( func() error {