Skip to content

Commit

Permalink
[v2.9] Avoid leaking transports cache (rancher#47335)
Browse files Browse the repository at this point in the history
  • Loading branch information
aruiz14 authored Oct 2, 2024
1 parent da8f83b commit 43c67f5
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 46 deletions.
35 changes: 26 additions & 9 deletions pkg/controllers/management/aks/aks_cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/rancher/rancher/pkg/controllers/management/rbac"
"github.com/rancher/rancher/pkg/controllers/management/secretmigrator"
"github.com/rancher/rancher/pkg/dialer"
"github.com/rancher/rancher/pkg/kontainer-engine/drivers/util"
"github.com/rancher/rancher/pkg/namespace"
"github.com/rancher/rancher/pkg/systemaccount"
"github.com/rancher/rancher/pkg/types/config"
Expand All @@ -32,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)

const (
Expand Down Expand Up @@ -330,18 +332,22 @@ func (e *aksOperatorController) updateAKSClusterConfig(cluster *apimgmtv3.Cluste

// generateAndSetServiceAccount uses the API endpoint and CA cert to generate a service account token. The token is then copied to the cluster status.
func (e *aksOperatorController) generateAndSetServiceAccount(cluster *apimgmtv3.Cluster) (*apimgmtv3.Cluster, error) {
clusterDialer, err := e.ClientDialer.ClusterDialHolder(cluster.Name, true)
if err != nil {
return cluster, err
}

restConfig, err := e.getRestConfig(cluster)
if err != nil {
return cluster, fmt.Errorf("error getting kube config: %v", err)
}

clusterDialer, err := e.ClientDialer.ClusterDialer(cluster.Name, true)
clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(clusterDialer))
if err != nil {
return cluster, err
return nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err)
}

restConfig.Dial = clusterDialer
saToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name)
saToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name)
if err != nil {
return cluster, fmt.Errorf("error generating service account token: %v", err)
}
Expand Down Expand Up @@ -400,6 +406,13 @@ func (e *aksOperatorController) recordAppliedSpec(cluster *apimgmtv3.Cluster) (*
return e.ClusterClient.Update(cluster)
}

var publicDialer = &transport.DialHolder{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
}

// generateSATokenWithPublicAPI tries to get a service account token from the cluster using the public API endpoint.
// This function is called if the cluster has only privateEndpoint enabled and is not publicly available.
// If Rancher is able to communicate with the cluster through its API endpoint even though it is private, then this function will retrieve
Expand All @@ -417,12 +430,13 @@ func (e *aksOperatorController) generateSATokenWithPublicAPI(cluster *apimgmtv3.
return "", nil, err
}

clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(publicDialer))
if err != nil {
return "", nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err)
}

requiresTunnel := new(bool)
restConfig.Dial = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext
serviceToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name)
serviceToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name)
if err != nil {
*requiresTunnel = true
var dnsError *net.DNSError
Expand Down Expand Up @@ -450,6 +464,9 @@ func (e *aksOperatorController) getRestConfig(cluster *apimgmtv3.Cluster) (*rest
if err != nil {
return nil, err
}
if restConfig.UserAgent == "" {
restConfig.UserAgent = util.UserAgentForCluster(cluster)
}

// Get the CACert from the cluster because it will have any additional CAs added to Rancher.
certFromCluster, err := base64.StdEncoding.DecodeString(cluster.Status.CACert)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/openapi"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)

const (
Expand Down Expand Up @@ -551,6 +552,11 @@ func (m MockFactory) ClusterDialer(clusterName string, retryOnError bool) (diale
return dialer, nil
}

func (m MockFactory) ClusterDialHolder(clusterName string, retryOnError bool) (*transport.DialHolder, error) {
clusterDialer, err := m.ClusterDialer(clusterName, retryOnError)
return &transport.DialHolder{Dial: clusterDialer}, err
}

func (m MockFactory) DockerDialer(clusterName, machineName string) (dialer.Dialer, error) {
panic("implement me")
}
Expand Down
37 changes: 32 additions & 5 deletions pkg/controllers/management/clusteroperator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clusteroperator
import (
"encoding/base64"
"fmt"
"net/http"
"strings"
"time"

Expand All @@ -13,7 +14,6 @@ import (
corev1 "github.com/rancher/rancher/pkg/generated/norman/core/v1"
mgmtv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
projectv3 "github.com/rancher/rancher/pkg/generated/norman/project.cattle.io/v3"
"github.com/rancher/rancher/pkg/kontainer-engine/drivers/util"
"github.com/rancher/rancher/pkg/namespace"
"github.com/rancher/rancher/pkg/systemaccount"
typesDialer "github.com/rancher/rancher/pkg/types/config/dialer"
Expand All @@ -26,6 +26,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/retry"
)

Expand Down Expand Up @@ -166,13 +167,39 @@ func (e *OperatorController) CheckCrdReady(cluster *mgmtv3.Cluster, clusterType
return cluster, nil
}

func GenerateSAToken(restConfig *rest.Config, clusterName string) (string, error) {
clientSet, err := kubernetes.NewForConfig(restConfig)
type TransportConfigOption func(*transport.Config)

func WithDialHolder(holder *transport.DialHolder) TransportConfigOption {
return func(cfg *transport.Config) {
cfg.DialHolder = holder
}
}

func NewClientSetForConfig(config *rest.Config, opts ...TransportConfigOption) (*kubernetes.Clientset, error) {
transportConfig, err := config.TransportConfig()
if err != nil {
return "", fmt.Errorf("error creating clientset for cluster %s: %v", clusterName, err)
return nil, err
}
for _, opt := range opts {
opt(transportConfig)
}

rt, err := transport.New(transportConfig)
if err != nil {
return nil, err
}

var httpClient *http.Client
if rt != http.DefaultTransport || config.Timeout > 0 {
httpClient = &http.Client{
Transport: rt,
Timeout: config.Timeout,
}
} else {
httpClient = http.DefaultClient
}

return util.GenerateServiceAccountToken(clientSet, clusterName)
return kubernetes.NewForConfigAndClient(config, httpClient)
}

func addAdditionalCA(secretsCache wranglerv1.SecretCache, caCert string) (string, error) {
Expand Down
35 changes: 24 additions & 11 deletions pkg/controllers/management/eks/eks_cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
"github.com/rancher/rancher/pkg/controllers/management/secretmigrator"
"github.com/rancher/rancher/pkg/dialer"
mgmtv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
"github.com/rancher/rancher/pkg/kontainer-engine/drivers/util"
"github.com/rancher/rancher/pkg/namespace"
"github.com/rancher/rancher/pkg/systemaccount"
"github.com/rancher/rancher/pkg/types/config"
typesDialer "github.com/rancher/rancher/pkg/types/config/dialer"
"github.com/rancher/rancher/pkg/wrangler"
corecontrollers "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
Expand All @@ -39,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"sigs.k8s.io/aws-iam-authenticator/pkg/token"
)

Expand Down Expand Up @@ -439,17 +440,21 @@ func (e *eksOperatorController) updateEKSClusterConfig(cluster *mgmtv3.Cluster,

// generateAndSetServiceAccount uses the API endpoint and CA cert to generate a service account token. The token is then copied to the cluster status.
func (e *eksOperatorController) generateAndSetServiceAccount(cluster *mgmtv3.Cluster) (*mgmtv3.Cluster, error) {
clusterDialer, err := e.ClientDialer.ClusterDialer(cluster.Name, true)
clusterDialer, err := e.ClientDialer.ClusterDialHolder(cluster.Name, true)
if err != nil {
return cluster, err
}

restConfig, err := e.getRestConfig(cluster, clusterDialer)
restConfig, err := e.getRestConfig(cluster)
if err != nil {
return cluster, err
}
clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(clusterDialer))
if err != nil {
return nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err)
}

saToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name)
saToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name)
if err != nil {
return cluster, err
}
Expand Down Expand Up @@ -508,6 +513,13 @@ func (e *eksOperatorController) recordAppliedSpec(cluster *mgmtv3.Cluster) (*mgm
return e.ClusterClient.Update(cluster)
}

var publicDialer = &transport.DialHolder{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
}

// generateSATokenWithPublicAPI tries to get a service account token from the cluster using the public API endpoint.
// This function is called if the cluster has only privateEndpoint enabled and not publicly available.
// If Rancher is able to communicate with the cluster through its API endpoint even though it is private, then this function will retrieve
Expand All @@ -520,16 +532,17 @@ func (e *eksOperatorController) recordAppliedSpec(cluster *mgmtv3.Cluster) (*mgm
// If an error different from the two below occur, then the *bool return value will be nil, indicating that Rancher was not able to determine if
// tunneling is required to communicate with the cluster.
func (e *eksOperatorController) generateSATokenWithPublicAPI(cluster *mgmtv3.Cluster) (string, *bool, error) {
restConfig, err := e.getRestConfig(cluster, (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext)
restConfig, err := e.getRestConfig(cluster)
if err != nil {
return "", nil, err
}
clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(publicDialer))
if err != nil {
return "", nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err)
}

requiresTunnel := new(bool)
serviceToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name)
serviceToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name)
if err != nil {
*requiresTunnel = true
var dnsError *net.DNSError
Expand Down Expand Up @@ -606,7 +619,7 @@ func (e *eksOperatorController) getAccessToken(cluster *mgmtv3.Cluster) (string,
return awsToken.Token, nil
}

func (e *eksOperatorController) getRestConfig(cluster *mgmtv3.Cluster, dialer typesDialer.Dialer) (*rest.Config, error) {
func (e *eksOperatorController) getRestConfig(cluster *mgmtv3.Cluster) (*rest.Config, error) {
accessToken, err := e.getAccessToken(cluster)
if err != nil {
return nil, err
Expand All @@ -622,8 +635,8 @@ func (e *eksOperatorController) getRestConfig(cluster *mgmtv3.Cluster, dialer ty
TLSClientConfig: rest.TLSClientConfig{
CAData: decodedCA,
},
UserAgent: util.UserAgentForCluster(cluster),
BearerToken: accessToken,
Dial: dialer,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/openapi"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)

const (
Expand Down Expand Up @@ -551,6 +552,11 @@ func (m MockFactory) ClusterDialer(clusterName string, retryOnError bool) (diale
return dialer, nil
}

func (m MockFactory) ClusterDialHolder(clusterName string, retryOnError bool) (*transport.DialHolder, error) {
clusterDialer, err := m.ClusterDialer(clusterName, retryOnError)
return &transport.DialHolder{Dial: clusterDialer}, err
}

func (m MockFactory) DockerDialer(clusterName, machineName string) (dialer.Dialer, error) {
panic("implement me")
}
Expand Down
36 changes: 25 additions & 11 deletions pkg/controllers/management/gke/gke_cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/rancher/rancher/pkg/controllers/management/secretmigrator"
"github.com/rancher/rancher/pkg/dialer"
mgmtv3 "github.com/rancher/rancher/pkg/generated/norman/management.cattle.io/v3"
"github.com/rancher/rancher/pkg/kontainer-engine/drivers/util"
"github.com/rancher/rancher/pkg/namespace"
"github.com/rancher/rancher/pkg/systemaccount"
"github.com/rancher/rancher/pkg/types/config"
typesDialer "github.com/rancher/rancher/pkg/types/config/dialer"
"github.com/rancher/rancher/pkg/wrangler"
corecontrollers "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
Expand All @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)

const (
Expand Down Expand Up @@ -355,17 +356,21 @@ func (e *gkeOperatorController) updateGKEClusterConfig(cluster *mgmtv3.Cluster,

// generateAndSetServiceAccount uses the API endpoint and CA cert to generate a service account token. The token is then copied to the cluster status.
func (e *gkeOperatorController) generateAndSetServiceAccount(cluster *mgmtv3.Cluster) (*mgmtv3.Cluster, error) {
clusterDialer, err := e.ClientDialer.ClusterDialer(cluster.Name, true)
clusterDialer, err := e.ClientDialer.ClusterDialHolder(cluster.Name, true)
if err != nil {
return cluster, err
}

restConfig, err := e.getRestConfig(cluster, clusterDialer)
restConfig, err := e.getRestConfig(cluster)
if err != nil {
return cluster, err
}
clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(clusterDialer))
if err != nil {
return nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err)
}

saToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name)
saToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name)
if err != nil {
return cluster, fmt.Errorf("error generating service account token: %w", err)
}
Expand Down Expand Up @@ -424,6 +429,13 @@ func (e *gkeOperatorController) recordAppliedSpec(cluster *mgmtv3.Cluster) (*mgm
return e.ClusterClient.Update(cluster)
}

var publicDialer = &transport.DialHolder{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
}

// generateSATokenWithPublicAPI tries to get a service account token from the cluster using the public API endpoint.
// This function is called if the cluster has only privateEndpoint enabled and not publicly available.
// If Rancher is able to communicate with the cluster through its API endpoint even though it is private, then this function will retrieve
Expand All @@ -436,15 +448,17 @@ func (e *gkeOperatorController) recordAppliedSpec(cluster *mgmtv3.Cluster) (*mgm
// If an error different from the two below occur, then the *bool return value will be nil, indicating that Rancher was not able to determine if
// tunneling is required to communicate with the cluster.
func (e *gkeOperatorController) generateSATokenWithPublicAPI(cluster *mgmtv3.Cluster) (string, *bool, error) {
restConfig, err := e.getRestConfig(cluster, (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext)
restConfig, err := e.getRestConfig(cluster)
if err != nil {
return "", nil, err
}
clientset, err := clusteroperator.NewClientSetForConfig(restConfig, clusteroperator.WithDialHolder(publicDialer))
if err != nil {
return "", nil, fmt.Errorf("error creating clientset for cluster %s: %w", cluster.Name, err)
}

requiresTunnel := new(bool)
serviceToken, err := clusteroperator.GenerateSAToken(restConfig, cluster.Name)
serviceToken, err := util.GenerateServiceAccountToken(clientset, cluster.Name)
if err != nil {
*requiresTunnel = true
if strings.Contains(err.Error(), "dial tcp") {
Expand All @@ -465,7 +479,7 @@ func (e *gkeOperatorController) generateSATokenWithPublicAPI(cluster *mgmtv3.Clu
return serviceToken, requiresTunnel, err
}

func (e *gkeOperatorController) getRestConfig(cluster *mgmtv3.Cluster, dialer typesDialer.Dialer) (*rest.Config, error) {
func (e *gkeOperatorController) getRestConfig(cluster *mgmtv3.Cluster) (*rest.Config, error) {
ctx := context.Background()
ts, err := controller.GetTokenSource(ctx, e.secretClient, cluster.Spec.GKEConfig)
if err != nil {
Expand All @@ -482,12 +496,12 @@ func (e *gkeOperatorController) getRestConfig(cluster *mgmtv3.Cluster, dialer ty
TLSClientConfig: rest.TLSClientConfig{
CAData: decodedCA,
},
UserAgent: util.UserAgentForCluster(cluster),
WrapTransport: func(rt http.RoundTripper) http.RoundTripper {
return &oauth2.Transport{
Source: ts,
Base: rt,
}
},
Dial: dialer,
}, nil
}
Loading

0 comments on commit 43c67f5

Please sign in to comment.