From 43c67f5d91757836c8fa732be35c6171d89759c1 Mon Sep 17 00:00:00 2001 From: Alejandro Ruiz <4057165+aruiz14@users.noreply.github.com> Date: Wed, 2 Oct 2024 10:45:29 +0200 Subject: [PATCH] [v2.9] Avoid leaking transports cache (#47335) --- .../management/aks/aks_cluster_handler.go | 35 ++++++++---- ...ks_cluster_handler_mockc_interface_test.go | 6 +++ .../management/clusteroperator/utils.go | 37 +++++++++++-- .../management/eks/eks_cluster_handler.go | 35 ++++++++---- ...ks_cluster_handler_mockc_interface_test.go | 6 +++ .../management/gke/gke_cluster_handler.go | 36 +++++++++---- ...ke_cluster_handler_mockc_interface_test.go | 6 +++ pkg/dialer/factory.go | 54 +++++++++++++++---- pkg/kontainer-engine/drivers/util/utils.go | 6 +++ pkg/types/config/dialer/dialer.go | 8 +++ 10 files changed, 183 insertions(+), 46 deletions(-) diff --git a/pkg/controllers/management/aks/aks_cluster_handler.go b/pkg/controllers/management/aks/aks_cluster_handler.go index 7872fae8e3c..bc85a793f7c 100644 --- a/pkg/controllers/management/aks/aks_cluster_handler.go +++ b/pkg/controllers/management/aks/aks_cluster_handler.go @@ -19,6 +19,7 @@ import ( "github.com/rancher/rancher/pkg/controllers/management/rbac" "github.com/rancher/rancher/pkg/controllers/management/secretmigrator" "github.com/rancher/rancher/pkg/dialer" + "github.com/rancher/rancher/pkg/kontainer-engine/drivers/util" "github.com/rancher/rancher/pkg/namespace" "github.com/rancher/rancher/pkg/systemaccount" "github.com/rancher/rancher/pkg/types/config" @@ -32,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" + "k8s.io/client-go/transport" ) const ( @@ -330,18 +332,22 @@ func (e *aksOperatorController) updateAKSClusterConfig(cluster *apimgmtv3.Cluste // generateAndSetServiceAccount uses the API endpoint and CA cert to generate a service account token. The token is then copied to the cluster status. func (e *aksOperatorController) generateAndSetServiceAccount(cluster *apimgmtv3.Cluster) (*apimgmtv3.Cluster, error) { + clusterDialer, err := e.ClientDialer.ClusterDialHolder(cluster.Name, true) + if err != nil { + return cluster, err + } + restConfig, err := e.getRestConfig(cluster) if err != nil { return cluster, fmt.Errorf("error getting kube config: %v", err) } - clusterDialer, err := e.ClientDialer.ClusterDialer(cluster.Name, true) + clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(clusterDialer)) if err != nil { - return cluster, err + return nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err) } - restConfig.Dial = clusterDialer - saToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name) + saToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name) if err != nil { return cluster, fmt.Errorf("error generating service account token: %v", err) } @@ -400,6 +406,13 @@ func (e *aksOperatorController) recordAppliedSpec(cluster *apimgmtv3.Cluster) (* return e.ClusterClient.Update(cluster) } +var publicDialer = &transport.DialHolder{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, +} + // generateSATokenWithPublicAPI tries to get a service account token from the cluster using the public API endpoint. // This function is called if the cluster has only privateEndpoint enabled and is not publicly available. // If Rancher is able to communicate with the cluster through its API endpoint even though it is private, then this function will retrieve @@ -417,12 +430,13 @@ func (e *aksOperatorController) generateSATokenWithPublicAPI(cluster *apimgmtv3. return "", nil, err } + clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(publicDialer)) + if err != nil { + return "", nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err) + } + requiresTunnel := new(bool) - restConfig.Dial = (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext - serviceToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name) + serviceToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name) if err != nil { *requiresTunnel = true var dnsError *net.DNSError @@ -450,6 +464,9 @@ func (e *aksOperatorController) getRestConfig(cluster *apimgmtv3.Cluster) (*rest if err != nil { return nil, err } + if restConfig.UserAgent == "" { + restConfig.UserAgent = util.UserAgentForCluster(cluster) + } // Get the CACert from the cluster because it will have any additional CAs added to Rancher. certFromCluster, err := base64.StdEncoding.DecodeString(cluster.Status.CACert) diff --git a/pkg/controllers/management/aks/aks_cluster_handler_mockc_interface_test.go b/pkg/controllers/management/aks/aks_cluster_handler_mockc_interface_test.go index 0233d334298..da2b853a4d7 100644 --- a/pkg/controllers/management/aks/aks_cluster_handler_mockc_interface_test.go +++ b/pkg/controllers/management/aks/aks_cluster_handler_mockc_interface_test.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/openapi" "k8s.io/client-go/rest" + "k8s.io/client-go/transport" ) const ( @@ -551,6 +552,11 @@ func (m MockFactory) ClusterDialer(clusterName string, retryOnError bool) (diale return dialer, nil } +func (m MockFactory) ClusterDialHolder(clusterName string, retryOnError bool) (*transport.DialHolder, error) { + clusterDialer, err := m.ClusterDialer(clusterName, retryOnError) + return &transport.DialHolder{Dial: clusterDialer}, err +} + func (m MockFactory) DockerDialer(clusterName, machineName string) (dialer.Dialer, error) { panic("implement me") } diff --git a/pkg/controllers/management/clusteroperator/utils.go b/pkg/controllers/management/clusteroperator/utils.go index 7742ccacfd4..4b926dd4b32 100644 --- a/pkg/controllers/management/clusteroperator/utils.go +++ b/pkg/controllers/management/clusteroperator/utils.go @@ -3,6 +3,7 @@ package clusteroperator import ( "encoding/base64" "fmt" + "net/http" "strings" "time" @@ -13,7 +14,6 @@ import ( corev1 "github.com/rancher/rancher/pkg/generated/norman/core/v1" mgmtv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3" projectv3 "github.com/rancher/rancher/pkg/generated/norman/project.cattle.io/v3" - "github.com/rancher/rancher/pkg/kontainer-engine/drivers/util" "github.com/rancher/rancher/pkg/namespace" "github.com/rancher/rancher/pkg/systemaccount" typesDialer "github.com/rancher/rancher/pkg/types/config/dialer" @@ -26,6 +26,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/transport" "k8s.io/client-go/util/retry" ) @@ -166,13 +167,39 @@ func (e *OperatorController) CheckCrdReady(cluster *mgmtv3.Cluster, clusterType return cluster, nil } -func GenerateSAToken(restConfig *rest.Config, clusterName string) (string, error) { - clientSet, err := kubernetes.NewForConfig(restConfig) +type TransportConfigOption func(*transport.Config) + +func WithDialHolder(holder *transport.DialHolder) TransportConfigOption { + return func(cfg *transport.Config) { + cfg.DialHolder = holder + } +} + +func NewClientSetForConfig(config *rest.Config, opts ...TransportConfigOption) (*kubernetes.Clientset, error) { + transportConfig, err := config.TransportConfig() if err != nil { - return "", fmt.Errorf("error creating clientset for cluster %s: %v", clusterName, err) + return nil, err + } + for _, opt := range opts { + opt(transportConfig) + } + + rt, err := transport.New(transportConfig) + if err != nil { + return nil, err + } + + var httpClient *http.Client + if rt != http.DefaultTransport || config.Timeout > 0 { + httpClient = &http.Client{ + Transport: rt, + Timeout: config.Timeout, + } + } else { + httpClient = http.DefaultClient } - return util.GenerateServiceAccountToken(clientSet, clusterName) + return kubernetes.NewForConfigAndClient(config, httpClient) } func addAdditionalCA(secretsCache wranglerv1.SecretCache, caCert string) (string, error) { diff --git a/pkg/controllers/management/eks/eks_cluster_handler.go b/pkg/controllers/management/eks/eks_cluster_handler.go index 1c4c9317152..884e49b30ef 100644 --- a/pkg/controllers/management/eks/eks_cluster_handler.go +++ b/pkg/controllers/management/eks/eks_cluster_handler.go @@ -25,10 +25,10 @@ import ( "github.com/rancher/rancher/pkg/controllers/management/secretmigrator" "github.com/rancher/rancher/pkg/dialer" mgmtv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3" + "github.com/rancher/rancher/pkg/kontainer-engine/drivers/util" "github.com/rancher/rancher/pkg/namespace" "github.com/rancher/rancher/pkg/systemaccount" "github.com/rancher/rancher/pkg/types/config" - typesDialer "github.com/rancher/rancher/pkg/types/config/dialer" "github.com/rancher/rancher/pkg/wrangler" corecontrollers "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" "github.com/sirupsen/logrus" @@ -39,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" + "k8s.io/client-go/transport" "sigs.k8s.io/aws-iam-authenticator/pkg/token" ) @@ -439,17 +440,21 @@ func (e *eksOperatorController) updateEKSClusterConfig(cluster *mgmtv3.Cluster, // generateAndSetServiceAccount uses the API endpoint and CA cert to generate a service account token. The token is then copied to the cluster status. func (e *eksOperatorController) generateAndSetServiceAccount(cluster *mgmtv3.Cluster) (*mgmtv3.Cluster, error) { - clusterDialer, err := e.ClientDialer.ClusterDialer(cluster.Name, true) + clusterDialer, err := e.ClientDialer.ClusterDialHolder(cluster.Name, true) if err != nil { return cluster, err } - restConfig, err := e.getRestConfig(cluster, clusterDialer) + restConfig, err := e.getRestConfig(cluster) if err != nil { return cluster, err } + clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(clusterDialer)) + if err != nil { + return nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err) + } - saToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name) + saToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name) if err != nil { return cluster, err } @@ -508,6 +513,13 @@ func (e *eksOperatorController) recordAppliedSpec(cluster *mgmtv3.Cluster) (*mgm return e.ClusterClient.Update(cluster) } +var publicDialer = &transport.DialHolder{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, +} + // generateSATokenWithPublicAPI tries to get a service account token from the cluster using the public API endpoint. // This function is called if the cluster has only privateEndpoint enabled and not publicly available. // If Rancher is able to communicate with the cluster through its API endpoint even though it is private, then this function will retrieve @@ -520,16 +532,17 @@ func (e *eksOperatorController) recordAppliedSpec(cluster *mgmtv3.Cluster) (*mgm // If an error different from the two below occur, then the *bool return value will be nil, indicating that Rancher was not able to determine if // tunneling is required to communicate with the cluster. func (e *eksOperatorController) generateSATokenWithPublicAPI(cluster *mgmtv3.Cluster) (string, *bool, error) { - restConfig, err := e.getRestConfig(cluster, (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext) + restConfig, err := e.getRestConfig(cluster) if err != nil { return "", nil, err } + clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(publicDialer)) + if err != nil { + return "", nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err) + } requiresTunnel := new(bool) - serviceToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name) + serviceToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name) if err != nil { *requiresTunnel = true var dnsError *net.DNSError @@ -606,7 +619,7 @@ func (e *eksOperatorController) getAccessToken(cluster *mgmtv3.Cluster) (string, return awsToken.Token, nil } -func (e *eksOperatorController) getRestConfig(cluster *mgmtv3.Cluster, dialer typesDialer.Dialer) (*rest.Config, error) { +func (e *eksOperatorController) getRestConfig(cluster *mgmtv3.Cluster) (*rest.Config, error) { accessToken, err := e.getAccessToken(cluster) if err != nil { return nil, err @@ -622,8 +635,8 @@ func (e *eksOperatorController) getRestConfig(cluster *mgmtv3.Cluster, dialer ty TLSClientConfig: rest.TLSClientConfig{ CAData: decodedCA, }, + UserAgent: util.UserAgentForCluster(cluster), BearerToken: accessToken, - Dial: dialer, }, nil } diff --git a/pkg/controllers/management/eks/eks_cluster_handler_mockc_interface_test.go b/pkg/controllers/management/eks/eks_cluster_handler_mockc_interface_test.go index 9bd3f81e08d..fd9d82e4826 100644 --- a/pkg/controllers/management/eks/eks_cluster_handler_mockc_interface_test.go +++ b/pkg/controllers/management/eks/eks_cluster_handler_mockc_interface_test.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/openapi" "k8s.io/client-go/rest" + "k8s.io/client-go/transport" ) const ( @@ -551,6 +552,11 @@ func (m MockFactory) ClusterDialer(clusterName string, retryOnError bool) (diale return dialer, nil } +func (m MockFactory) ClusterDialHolder(clusterName string, retryOnError bool) (*transport.DialHolder, error) { + clusterDialer, err := m.ClusterDialer(clusterName, retryOnError) + return &transport.DialHolder{Dial: clusterDialer}, err +} + func (m MockFactory) DockerDialer(clusterName, machineName string) (dialer.Dialer, error) { panic("implement me") } diff --git a/pkg/controllers/management/gke/gke_cluster_handler.go b/pkg/controllers/management/gke/gke_cluster_handler.go index cd19c2baea5..63ab54e3642 100644 --- a/pkg/controllers/management/gke/gke_cluster_handler.go +++ b/pkg/controllers/management/gke/gke_cluster_handler.go @@ -21,10 +21,10 @@ import ( "github.com/rancher/rancher/pkg/controllers/management/secretmigrator" "github.com/rancher/rancher/pkg/dialer" mgmtv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3" + "github.com/rancher/rancher/pkg/kontainer-engine/drivers/util" "github.com/rancher/rancher/pkg/namespace" "github.com/rancher/rancher/pkg/systemaccount" "github.com/rancher/rancher/pkg/types/config" - typesDialer "github.com/rancher/rancher/pkg/types/config/dialer" "github.com/rancher/rancher/pkg/wrangler" corecontrollers "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" "github.com/sirupsen/logrus" @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" + "k8s.io/client-go/transport" ) const ( @@ -355,17 +356,21 @@ func (e *gkeOperatorController) updateGKEClusterConfig(cluster *mgmtv3.Cluster, // generateAndSetServiceAccount uses the API endpoint and CA cert to generate a service account token. The token is then copied to the cluster status. func (e *gkeOperatorController) generateAndSetServiceAccount(cluster *mgmtv3.Cluster) (*mgmtv3.Cluster, error) { - clusterDialer, err := e.ClientDialer.ClusterDialer(cluster.Name, true) + clusterDialer, err := e.ClientDialer.ClusterDialHolder(cluster.Name, true) if err != nil { return cluster, err } - restConfig, err := e.getRestConfig(cluster, clusterDialer) + restConfig, err := e.getRestConfig(cluster) if err != nil { return cluster, err } + clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(clusterDialer)) + if err != nil { + return nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err) + } - saToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name) + saToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name) if err != nil { return cluster, fmt.Errorf("error generating service account token: %w", err) } @@ -424,6 +429,13 @@ func (e *gkeOperatorController) recordAppliedSpec(cluster *mgmtv3.Cluster) (*mgm return e.ClusterClient.Update(cluster) } +var publicDialer = &transport.DialHolder{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, +} + // generateSATokenWithPublicAPI tries to get a service account token from the cluster using the public API endpoint. // This function is called if the cluster has only privateEndpoint enabled and not publicly available. // If Rancher is able to communicate with the cluster through its API endpoint even though it is private, then this function will retrieve @@ -436,15 +448,17 @@ func (e *gkeOperatorController) recordAppliedSpec(cluster *mgmtv3.Cluster) (*mgm // If an error different from the two below occur, then the *bool return value will be nil, indicating that Rancher was not able to determine if // tunneling is required to communicate with the cluster. func (e *gkeOperatorController) generateSATokenWithPublicAPI(cluster *mgmtv3.Cluster) (string, *bool, error) { - restConfig, err := e.getRestConfig(cluster, (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext) + restConfig, err := e.getRestConfig(cluster) if err != nil { return "", nil, err } + clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(publicDialer)) + if err != nil { + return "", nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err) + } + requiresTunnel := new(bool) - serviceToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name) + serviceToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name) if err != nil { *requiresTunnel = true if strings.Contains(err.Error(), "dial tcp") { @@ -465,7 +479,7 @@ func (e *gkeOperatorController) generateSATokenWithPublicAPI(cluster *mgmtv3.Clu return serviceToken, requiresTunnel, err } -func (e *gkeOperatorController) getRestConfig(cluster *mgmtv3.Cluster, dialer typesDialer.Dialer) (*rest.Config, error) { +func (e *gkeOperatorController) getRestConfig(cluster *mgmtv3.Cluster) (*rest.Config, error) { ctx := context.Background() ts, err := controller.GetTokenSource(ctx, e.secretClient, cluster.Spec.GKEConfig) if err != nil { @@ -482,12 +496,12 @@ func (e *gkeOperatorController) getRestConfig(cluster *mgmtv3.Cluster, dialer ty TLSClientConfig: rest.TLSClientConfig{ CAData: decodedCA, }, + UserAgent: util.UserAgentForCluster(cluster), WrapTransport: func(rt http.RoundTripper) http.RoundTripper { return &oauth2.Transport{ Source: ts, Base: rt, } }, - Dial: dialer, }, nil } diff --git a/pkg/controllers/management/gke/gke_cluster_handler_mockc_interface_test.go b/pkg/controllers/management/gke/gke_cluster_handler_mockc_interface_test.go index b8af9a7dce3..4e3fae8ec41 100644 --- a/pkg/controllers/management/gke/gke_cluster_handler_mockc_interface_test.go +++ b/pkg/controllers/management/gke/gke_cluster_handler_mockc_interface_test.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/openapi" "k8s.io/client-go/rest" + "k8s.io/client-go/transport" ) const ( @@ -550,6 +551,11 @@ func (m MockFactory) ClusterDialer(clusterName string, retryOnError bool) (diale return dialer, nil } +func (m MockFactory) ClusterDialHolder(clusterName string, retryOnError bool) (*transport.DialHolder, error) { + clusterDialer, err := m.ClusterDialer(clusterName, retryOnError) + return &transport.DialHolder{Dial: clusterDialer}, err +} + func (m MockFactory) DockerDialer(clusterName, machineName string) (dialer.Dialer, error) { panic("implement me") } diff --git a/pkg/dialer/factory.go b/pkg/dialer/factory.go index 95fae1f067e..fe3e15245cb 100644 --- a/pkg/dialer/factory.go +++ b/pkg/dialer/factory.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "strings" + "sync" "time" "github.com/rancher/norman/types/slice" @@ -23,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/transport" ) const ( @@ -36,6 +38,7 @@ func NewFactory(apiContext *config.ScaledContext, wrangler *wrangler.Context) (* clusterLister: apiContext.Management.Clusters("").Controller().Lister(), nodeLister: apiContext.Management.Nodes("").Controller().Lister(), TunnelServer: wrangler.TunnelServer, + dialHolders: map[string]*transport.DialHolder{}, }, nil } @@ -43,6 +46,9 @@ type Factory struct { nodeLister v3.NodeLister clusterLister v3.ClusterLister TunnelServer *remotedialer.Server + + dialHolders map[string]*transport.DialHolder + dialHoldersLock sync.RWMutex } func (f *Factory) ClusterDialer(clusterName string, retryOnError bool) (dialer.Dialer, error) { @@ -56,6 +62,37 @@ func (f *Factory) ClusterDialer(clusterName string, retryOnError bool) (dialer.D }, nil } +func (f *Factory) ClusterDialHolder(clusterName string, retryOnError bool) (*transport.DialHolder, error) { + // Get cached dialHolder, if available + f.dialHoldersLock.RLock() + cached, ok := f.dialHolders[clusterName] + f.dialHoldersLock.RUnlock() + if ok { + return cached, nil + } + + // Lock for writing + f.dialHoldersLock.Lock() + defer f.dialHoldersLock.Unlock() + + // Check for possible writes while waiting + if cached, ok := f.dialHolders[clusterName]; ok { + return cached, nil + } + + // Create new dialHolder + clusterDialer, err := f.ClusterDialer(clusterName, retryOnError) + if err != nil { + return nil, err + } + dialHolder := &transport.DialHolder{Dial: clusterDialer} + + // Save in the cache + f.dialHolders[clusterName] = dialHolder + + return dialHolder, nil +} + func IsCloudDriver(cluster *v3.Cluster) bool { return !cluster.Spec.Internal && cluster.Status.Driver != "" && @@ -173,6 +210,11 @@ func (f *Factory) translateClusterAddress(cluster *v3.Cluster, clusterHostPort, return address } +var nativeDialer dialer.Dialer = (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, +}).DialContext + func (f *Factory) clusterDialer(clusterName, address string, retryOnError bool) (dialer.Dialer, error) { cluster, err := f.clusterLister.Get("", clusterName) if err != nil { @@ -181,14 +223,14 @@ func (f *Factory) clusterDialer(clusterName, address string, retryOnError bool) if cluster.Spec.Internal { // For local (embedded, or import) we just assume we can connect directly - return native() + return nativeDialer, nil } hostPort := hostPort(cluster) logrus.Tracef("dialerFactory: apiEndpoint hostPort for cluster [%s] is [%s]", clusterName, hostPort) if (address == hostPort || isProxyAddress(address)) && IsPublicCloudDriver(cluster) { // For cloud drivers we just connect directly to the k8s API, not through the tunnel. All other go through tunnel - return native() + return nativeDialer, nil } if f.TunnelServer.HasSession(cluster.Name) { @@ -271,14 +313,6 @@ func hostPort(cluster *v3.Cluster) string { return u.Host + ":443" } -func native() (dialer.Dialer, error) { - netDialer := net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - } - return netDialer.DialContext, nil -} - func (f *Factory) DockerDialer(clusterName, machineName string) (dialer.Dialer, error) { machine, err := f.nodeLister.Get(clusterName, machineName) if err != nil { diff --git a/pkg/kontainer-engine/drivers/util/utils.go b/pkg/kontainer-engine/drivers/util/utils.go index 4db96bca6c9..2d2b92c4a5d 100644 --- a/pkg/kontainer-engine/drivers/util/utils.go +++ b/pkg/kontainer-engine/drivers/util/utils.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + managementv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3" "github.com/rancher/rancher/pkg/serviceaccounttoken" "github.com/rancher/rancher/pkg/utils" rketypes "github.com/rancher/rke/types" @@ -13,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" ) const ( @@ -25,6 +27,10 @@ const ( newClusterRoleBindingName = "system-netes-default-clusterRoleBinding" ) +func UserAgentForCluster(cluster *managementv3.Cluster) string { + return fmt.Sprintf("%s cluster %s", rest.DefaultKubernetesUserAgent(), cluster.Name) +} + // GenerateServiceAccountToken generate a serviceAccountToken for clusterAdmin given a rest clientset func GenerateServiceAccountToken(clientset kubernetes.Interface, clusterName string) (string, error) { _, err := clientset.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ diff --git a/pkg/types/config/dialer/dialer.go b/pkg/types/config/dialer/dialer.go index 9fbb179ba62..93c35d225e7 100644 --- a/pkg/types/config/dialer/dialer.go +++ b/pkg/types/config/dialer/dialer.go @@ -3,6 +3,8 @@ package dialer import ( "context" "net" + + "k8s.io/client-go/transport" ) type Dialer func(ctx context.Context, network, address string) (net.Conn, error) @@ -12,7 +14,13 @@ type Factory interface { // Note that the dialer may or may not use a remotedialer tunnel // If retryOnError is true, the dialer will retry for ~30s in case it cannot connect, // otherwise return immediately + // NOTE: ClusterDialer must not be used for Kubernetes clients; use ClusterDialHolder instead. ClusterDialer(clusterName string, retryOnError bool) (Dialer, error) + // ClusterDialHolder returns a ClusterDialer, wrapped inside a Kubernetes' transport struct + // used to allow caching of transports. + // Using a custom Dial in rest.Config will cause this cache to grow indefinitely. + // see: https://github.com/kubernetes/kubernetes/issues/125818 + ClusterDialHolder(clusterName string, retryOnError bool) (*transport.DialHolder, error) DockerDialer(clusterName, machineName string) (Dialer, error) NodeDialer(clusterName, machineName string) (Dialer, error) }