Skip to content

Commit

Permalink
feat: support forward requests to leader hub for sharing pool scope m…
Browse files Browse the repository at this point in the history
…etadata in nodepool.

Signed-off-by: rambohe-ch <[email protected]>
  • Loading branch information
rambohe-ch committed Feb 17, 2025
1 parent e5d69ba commit 8f25132
Show file tree
Hide file tree
Showing 40 changed files with 1,030 additions and 261 deletions.
1 change: 1 addition & 0 deletions charts/yurthub/templates/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ metadata:
name: yurt-hub-cfg
namespace: {{ .Release.Namespace }}
labels:
openyurt.io/configmap-name: yurt-hub-cfg
{{- include "yurthub.labels" . | nindent 4 }}
data:
cache_agents: {{ .Values.cacheAgents | quote }}
Expand Down
1 change: 1 addition & 0 deletions charts/yurthub/templates/yurthub-cloud-yurtstaticset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ spec:
- --bind-address={{ .Values.yurthubBindingAddr }}
- --server-addr={{ .Values.kubernetesServerAddr }}
- --node-name=$(NODE_NAME)
- --nodepool-name={{ .Values.nodePoolName }}
- --bootstrap-file={{ .Values.bootstrapFile }}
- --working-mode=cloud
- --namespace={{ .Release.Namespace }}
Expand Down
1 change: 1 addition & 0 deletions charts/yurthub/templates/yurthub-yurtstaticset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ spec:
- --bind-address={{ .Values.yurthubBindingAddr }}
- --server-addr={{ .Values.kubernetesServerAddr }}
- --node-name=$(NODE_NAME)
- --nodepool-name={{ .Values.nodePoolName }}
- --bootstrap-file={{ .Values.bootstrapFile }}
- --working-mode=edge
- --namespace={{ .Release.Namespace }}
Expand Down
1 change: 1 addition & 0 deletions charts/yurthub/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ yurthubBindingAddr: 127.0.0.1
kubernetesServerAddr: https://127.0.0.1:6443
bootstrapFile: "/var/lib/yurthub/bootstrap-hub.conf"
organizations: ""
nodePoolName: ""

image:
registry: openyurt
Expand Down
56 changes: 48 additions & 8 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker/leaderhub"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
Expand Down Expand Up @@ -90,11 +93,13 @@ type YurtHubConfiguration struct {
YurtHubProxyServerServing *apiserver.DeprecatedInsecureServingInfo
YurtHubDummyProxyServerServing *apiserver.DeprecatedInsecureServingInfo
YurtHubSecureProxyServerServing *apiserver.SecureServingInfo
YurtHubMultiplexerServerServing *apiserver.SecureServingInfo
DiskCachePath string
RequestMultiplexerManager *multiplexer.MultiplexerManager
ConfigManager *configuration.Manager
TenantManager tenant.Interface
TransportAndDirectClientManager transport.Interface
LoadBalancerForLeaderHub remote.LoadBalancer
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand All @@ -116,7 +121,9 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub

// list/watch endpoints from host cluster in order to resolve tenant cluster address.
newEndpointsInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return coreinformers.NewFilteredEndpointsInformer(client, "kube-public", resyncPeriod, nil, nil)
informer := coreinformers.NewFilteredEndpointsInformer(client, "kube-public", resyncPeriod, nil, nil)
informer.SetTransform(pkgutil.TransformStripManagedFields())
return informer
}
sharedFactory.InformerFor(&corev1.Endpoints{}, newEndpointsInformer)
cfg.SharedFactory = sharedFactory
Expand Down Expand Up @@ -145,7 +152,7 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub
if err != nil {
return nil, err
}
registerInformers(sharedFactory, options.YurtHubNamespace, options.NodeName, (cfg.WorkingMode == util.WorkingModeCloud), tenantNamespce)
registerInformers(sharedFactory, options.YurtHubNamespace, options.NodePoolName, options.NodeName, (cfg.WorkingMode == util.WorkingModeCloud), tenantNamespce)

certMgr, err := certificatemgr.NewYurtHubCertManager(options, us)
if err != nil {
Expand Down Expand Up @@ -189,9 +196,19 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub
return nil, err
}

// no leader hub servers for transport manager at startup time.
// and reverse proxy will only use BearerTransport to forward requests to leader hub.
transportManagerForLeaderHub, err := transport.NewTransportAndClientManager([]*url.URL{}, options.HeartbeatTimeoutSeconds, certMgr, stopCh)
if err != nil {
return nil, fmt.Errorf("could not new transport manager, %w", err)
}
healthCheckerForLeaderHub := leaderhub.NewLeaderHubHealthChecker(20*time.Second, nil, stopCh)
loadBalancerForLeaderHub := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, transportManagerForLeaderHub, healthCheckerForLeaderHub, nil, stopCh)

cfg.ConfigManager = configManager
cfg.FilterFinder = filterFinder
cfg.RequestMultiplexerManager = newRequestMultiplexerManager(options, restMapperManager)
cfg.LoadBalancerForLeaderHub = loadBalancerForLeaderHub
cfg.RequestMultiplexerManager = newRequestMultiplexerManager(options, restMapperManager, sharedFactory, healthCheckerForLeaderHub, loadBalancerForLeaderHub)

if options.EnableDummyIf {
klog.V(2).Infof("create dummy network interface %s(%s)", options.HubAgentDummyIfName, options.HubAgentDummyIfIP)
Expand Down Expand Up @@ -284,15 +301,17 @@ func createClientAndSharedInformerFactories(serverAddr, nodePoolName string) (ku
func registerInformers(
informerFactory informers.SharedInformerFactory,
namespace string,
poolName string,
nodeName string,
enablePodInformer bool,
tenantNs string) {

// configmap informer is used for list/watching yurt-hub-cfg configmap which includes configurations about cache agents and filters.
// and is used by approver in filter and cache manager on cloud and edge working mode.
// configmap informer is used for list/watching yurt-hub-cfg configmap and leader-hub-{poolName} configmap.
// yurt-hub-cfg configmap includes configurations about cache agents and filters which are needed by approver in filter and cache manager on cloud and edge working mode.
// leader-hub-{nodePoolName} configmap includes leader election configurations which are used by multiplexer manager.
newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
options.LabelSelector = fmt.Sprintf("openyurt.io/configmap-name in (%s, %s)", util.YurthubConfigMapName, "leader-hub-"+poolName)
}
informer := coreinformers.NewFilteredConfigMapInformer(client, namespace, resyncPeriod, nil, tweakListOptions)
informer.SetTransform(pkgutil.TransformStripManagedFields())
Expand Down Expand Up @@ -385,17 +404,32 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y
cfg.YurtHubSecureProxyServerServing.ClientCA = caBundleProvider
cfg.YurtHubSecureProxyServerServing.DisableHTTP2 = true

if err := (&apiserveroptions.SecureServingOptions{
BindAddress: net.ParseIP(options.NodeIP),
BindPort: options.PortForMultiplexer,
BindNetwork: "tcp",
ServerCert: apiserveroptions.GeneratableKeyCert{
CertKey: apiserveroptions.CertKey{
CertFile: serverCertPath,
KeyFile: serverCertPath,
},
},
}).ApplyTo(&cfg.YurtHubMultiplexerServerServing); err != nil {
return err
}
cfg.YurtHubMultiplexerServerServing.ClientCA = caBundleProvider
cfg.YurtHubMultiplexerServerServing.DisableHTTP2 = true
return nil
}

func newRequestMultiplexerManager(options *options.YurtHubOptions, restMapperManager *meta.RESTMapperManager) *multiplexer.MultiplexerManager {
func newRequestMultiplexerManager(options *options.YurtHubOptions, restMapperManager *meta.RESTMapperManager, sharedFactory informers.SharedInformerFactory, checker healthchecker.Interface, lb remote.LoadBalancer) *multiplexer.MultiplexerManager {
config := &rest.Config{
Host: fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort),
UserAgent: util.MultiplexerProxyClientUserAgentPrefix + options.NodeName,
}
storageProvider := storage.NewStorageProvider(config)

return multiplexer.NewRequestMultiplexerManager(storageProvider, restMapperManager, options.PoolScopeResources)
return multiplexer.NewRequestMultiplexerManager(options, storageProvider, restMapperManager, sharedFactory, checker, lb)
}

func ReadinessCheck(cfg *YurtHubConfiguration) error {
Expand All @@ -417,5 +451,11 @@ func ReadinessCheck(cfg *YurtHubConfiguration) error {
}
}

if cfg.RequestMultiplexerManager != nil {
if synced := cfg.RequestMultiplexerManager.HasSynced(); !synced {
return fmt.Errorf("resources needed by multiplexer are not synced")
}
}

return nil
}
1 change: 1 addition & 0 deletions cmd/yurthub/app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestComplete(t *testing.T) {
options.NodeName = "foo"
options.EnableDummyIf = false
options.HubAgentDummyIfIP = "169.254.2.1"
options.NodeIP = "127.0.0.1"
cfg, err := Complete(options, nil)
if err != nil {
t.Errorf("expect no err, but got %v", err)
Expand Down
19 changes: 19 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime/schema"
apinet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
Expand Down Expand Up @@ -83,6 +84,8 @@ type YurtHubOptions struct {
ClientForTest kubernetes.Interface
EnablePoolServiceTopology bool
PoolScopeResources PoolScopeMetadatas
PortForMultiplexer int
NodeIP string
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand All @@ -93,6 +96,7 @@ func NewYurtHubOptions() *YurtHubOptions {
YurtHubProxyPort: util.YurtHubProxyPort,
YurtHubPort: util.YurtHubPort,
YurtHubProxySecurePort: util.YurtHubProxySecurePort,
PortForMultiplexer: util.YurtHubMultiplexerPort,
YurtHubNamespace: util.YurtHubNamespace,
GCFrequency: 120,
YurtHubCertOrganizations: make([]string, 0),
Expand Down Expand Up @@ -167,6 +171,19 @@ func (options *YurtHubOptions) Validate() error {
if len(options.CACertHashes) == 0 && !options.UnsafeSkipCAVerification {
return fmt.Errorf("set --discovery-token-unsafe-skip-ca-verification flag as true or pass CACertHashes to continue")
}

if len(options.NodePoolName) == 0 {
return fmt.Errorf("node-pool-name is empty")
}

if len(options.NodeIP) == 0 {
ipAddr, err := apinet.ResolveBindAddress(nil)
if err != nil {
return fmt.Errorf("couldn't get the node ip, %v", err)
}
options.NodeIP = ipAddr.String()
klog.Infof("node ip is configured as %s", options.NodeIP)
}
}

return nil
Expand All @@ -179,11 +196,13 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.YurtHubProxyHost, "bind-proxy-address", o.YurtHubProxyHost, "the IP address of YurtHub Proxy Server")
fs.IntVar(&o.YurtHubProxyPort, "proxy-port", o.YurtHubProxyPort, "the port on which to proxy HTTP requests to kube-apiserver")
fs.IntVar(&o.YurtHubProxySecurePort, "proxy-secure-port", o.YurtHubProxySecurePort, "the port on which to proxy HTTPS requests to kube-apiserver")
fs.IntVar(&o.PortForMultiplexer, "multiplexer-port", o.PortForMultiplexer, "the port on which to proxy HTTPS requests to multiplexer in yurthub")
fs.StringVar(&o.YurtHubNamespace, "namespace", o.YurtHubNamespace, "the namespace of YurtHub Server")
fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver, the format is: \"server1,server2,...\"; when yurthub is in local mode, server-addr represents the service address of apiservers, the format is: \"ip:port\".")
fs.StringSliceVar(&o.YurtHubCertOrganizations, "hub-cert-organizations", o.YurtHubCertOrganizations, "Organizations that will be added into hub's apiserver client certificate, the format is: certOrg1,certOrg2,...")
fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).")
fs.StringVar(&o.NodeName, "node-name", o.NodeName, "the name of node that runs hub agent")
fs.StringVar(&o.NodeIP, "node-ip", o.NodeIP, "the same IP address of the node which used by kubelet. if unset, node's default IPv4 address will be used.")
fs.StringVar(&o.LBMode, "lb-mode", o.LBMode, "the mode of load balancer to connect remote servers(round-robin, priority)")
fs.IntVar(&o.HeartbeatFailedRetry, "heartbeat-failed-retry", o.HeartbeatFailedRetry, "number of heartbeat request retry after having failed.")
fs.IntVar(&o.HeartbeatHealthyThreshold, "heartbeat-healthy-threshold", o.HeartbeatHealthyThreshold, "minimum consecutive successes for the heartbeat to be considered healthy after having failed.")
Expand Down
8 changes: 6 additions & 2 deletions cmd/yurthub/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestNewYurtHubOptions(t *testing.T) {
YurtHubProxyPort: util.YurtHubProxyPort,
YurtHubPort: util.YurtHubPort,
YurtHubProxySecurePort: util.YurtHubProxySecurePort,
PortForMultiplexer: util.YurtHubMultiplexerPort,
YurtHubNamespace: util.YurtHubNamespace,
GCFrequency: 120,
YurtHubCertOrganizations: make([]string, 0),
Expand Down Expand Up @@ -179,10 +180,11 @@ func TestValidate(t *testing.T) {
LBMode: "rr",
WorkingMode: "cloud",
UnsafeSkipCAVerification: true,
NodePoolName: "foo",
},
isErr: false,
},
"normal options with ipv4": {
"normal options with ipv6": {
options: &YurtHubOptions{
NodeName: "foo",
ServerAddr: "1.2.3.4:56",
Expand All @@ -191,10 +193,11 @@ func TestValidate(t *testing.T) {
WorkingMode: "cloud",
UnsafeSkipCAVerification: true,
HubAgentDummyIfIP: "fd00::2:1",
NodePoolName: "foo",
},
isErr: false,
},
"normal options with ipv6": {
"normal options with ipv4": {
options: &YurtHubOptions{
NodeName: "foo",
ServerAddr: "1.2.3.4:56",
Expand All @@ -203,6 +206,7 @@ func TestValidate(t *testing.T) {
WorkingMode: "cloud",
UnsafeSkipCAVerification: true,
HubAgentDummyIfIP: "169.254.2.1",
NodePoolName: "foo",
},
isErr: false,
},
Expand Down
6 changes: 5 additions & 1 deletion pkg/node-servant/components/yurthub.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ const (
type yurtHubOperator struct {
apiServerAddr string
joinToken string
nodePoolName string
yurthubHealthCheckTimeout time.Duration
}

// NewYurthubOperator new yurtHubOperator struct
func NewYurthubOperator(apiServerAddr string, joinToken string, yurthubHealthCheckTimeout time.Duration) *yurtHubOperator {
func NewYurthubOperator(apiServerAddr string, joinToken, nodePoolName string, yurthubHealthCheckTimeout time.Duration) *yurtHubOperator {
return &yurtHubOperator{
apiServerAddr: apiServerAddr,
joinToken: joinToken,
nodePoolName: nodePoolName,
yurthubHealthCheckTimeout: yurthubHealthCheckTimeout,
}
}
Expand Down Expand Up @@ -91,6 +93,8 @@ func (op *yurtHubOperator) Install() error {
}
klog.Infof("yurt-hub.yaml apiServerAddr: %+v", op.apiServerAddr)
yssYurtHub := strings.ReplaceAll(string(content), "KUBERNETES_SERVER_ADDRESS", op.apiServerAddr)
klog.Infof("yurt-hub.yaml nodePoolName: %s", op.nodePoolName)
yssYurtHub = strings.ReplaceAll(string(yssYurtHub), "NODE_POOL_NAME", op.nodePoolName)
if err = os.WriteFile(getYurthubYaml(podManifestPath), []byte(yssYurtHub), fileMode); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/node-servant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ spec:
- /bin/sh
- -c
args:
- "/usr/local/bin/entry.sh convert {{if .yurthub_healthcheck_timeout}}--yurthub-healthcheck-timeout={{.yurthub_healthcheck_timeout}} {{end}}--join-token={{.joinToken}}"
- "/usr/local/bin/entry.sh convert {{if .yurthub_healthcheck_timeout}}--yurthub-healthcheck-timeout={{.yurthub_healthcheck_timeout}} {{end}}--join-token={{.joinToken}} --nodepool-name={{.nodePoolName}}"
securityContext:
privileged: true
volumeMounts:
Expand Down
4 changes: 3 additions & 1 deletion pkg/node-servant/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {
joinToken string
kubeadmConfPaths []string
openyurtDir string
nodePoolName string
}

// nodeConverter do the convert job
Expand All @@ -45,6 +46,7 @@ func NewConverterWithOptions(o *Options) *nodeConverter {
joinToken: o.joinToken,
kubeadmConfPaths: strings.Split(o.kubeadmConfPaths, ","),
openyurtDir: o.openyurtDir,
nodePoolName: o.nodePoolName,
},
}
}
Expand All @@ -70,7 +72,7 @@ func (n *nodeConverter) installYurtHub() error {
if apiServerAddress == "" {
return fmt.Errorf("get apiServerAddress empty")
}
op := components.NewYurthubOperator(apiServerAddress, n.joinToken, n.yurthubHealthCheckTimeout)
op := components.NewYurthubOperator(apiServerAddress, n.joinToken, n.nodePoolName, n.yurthubHealthCheckTimeout)
return op.Install()
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/node-servant/convert/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Options struct {
kubeadmConfPaths string
openyurtDir string
Version bool
nodePoolName string
}

// NewConvertOptions creates a new Options
Expand All @@ -56,6 +57,10 @@ func (o *Options) Validate() error {
return fmt.Errorf("join token(bootstrap token) is empty")
}

if len(o.nodePoolName) == 0 {
return fmt.Errorf("nodepool name is empty")
}

return nil
}

Expand All @@ -64,5 +69,6 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.yurthubHealthCheckTimeout, "yurthub-healthcheck-timeout", o.yurthubHealthCheckTimeout, "The timeout for yurthub health check.")
fs.StringVarP(&o.kubeadmConfPaths, "kubeadm-conf-path", "k", o.kubeadmConfPaths, "The path to kubelet service conf that is used by kubelet component to join the cluster on the work node. Support multiple values, will search in order until get the file.(e.g -k kbcfg1,kbcfg2)")
fs.StringVar(&o.joinToken, "join-token", o.joinToken, "The token used by yurthub for joining the cluster.")
fs.StringVar(&o.nodePoolName, "nodepool-name", o.nodePoolName, "The nodepool name which the node will be added")
fs.BoolVar(&o.Version, "version", o.Version, "print the version information.")
}
2 changes: 1 addition & 1 deletion pkg/node-servant/revert/revert.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (n *nodeReverter) revertKubelet() error {
}

func (n *nodeReverter) unInstallYurtHub() error {
op := components.NewYurthubOperator("", "", time.Duration(1)) // params is not important here
op := components.NewYurthubOperator("", "", "", time.Duration(1)) // params is not important here
return op.UnInstall()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/yurthub/certificate/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func NewYurtHubCertManager(options *options.YurtHubOptions, remoteServers []*url
net.ParseIP(options.HubAgentDummyIfIP),
net.ParseIP(options.YurtHubHost),
net.ParseIP(options.YurtHubProxyHost),
net.ParseIP(options.NodeIP),
})
serverCertManager, err := hubServerCert.NewHubServerCertificateManager(options.ClientForTest, clientCertManager, options.NodeName, filepath.Join(workDir, "pki"), certIPs)
if err != nil {
Expand Down
Loading

0 comments on commit 8f25132

Please sign in to comment.