diff --git a/metal/cloud.go b/metal/cloud.go index 15284f3f..d48e5045 100644 --- a/metal/cloud.go +++ b/metal/cloud.go @@ -27,11 +27,12 @@ const ( // cloud implements cloudprovider.Interface type cloud struct { - client *packngo.Client - config Config - instances *instances - loadBalancer *loadBalancers - controlPlaneEndpointManager *controlPlaneEndpointManager + client *packngo.Client + config Config + instances *instances + loadBalancer *loadBalancers + controlPlaneEndpointManager *controlPlaneEndpointManager + controlPlaneLoadBalancerManager *controlPlaneLoadBalancerManager // holds our bgp service handler bgp *bgp } @@ -80,6 +81,10 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, if err != nil { klog.Fatalf("could not initialize ControlPlaneEndpointManager: %v", err) } + lbm, err := newControlPlaneLoadBalancerManager(clientset, stop, c.config.ProjectID, c.config.LoadBalancerID, c.config.APIServerPort, c.config.EIPHealthCheckUseHostIP) + if err != nil { + klog.Fatalf("could not initialize ControlPlaneEndpointManager: %v", err) + } bgp, err := newBGP(c.client, clientset, c.config) if err != nil { klog.Fatalf("could not initialize BGP: %v", err) @@ -93,6 +98,7 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, c.bgp = bgp c.instances = newInstances(c.client, c.config.ProjectID) c.controlPlaneEndpointManager = epm + c.controlPlaneLoadBalancerManager = lbm klog.Info("Initialize of cloud provider complete") } diff --git a/metal/config.go b/metal/config.go index 5826a058..3642a548 100644 --- a/metal/config.go +++ b/metal/config.go @@ -31,6 +31,7 @@ const ( envVarAPIServerPort = "METAL_API_SERVER_PORT" envVarBGPNodeSelector = "METAL_BGP_NODE_SELECTOR" envVarEIPHealthCheckUseHostIP = "METAL_EIP_HEALTH_CHECK_USE_HOST_IP" + envVarLoadBalancerID = "METAL_LOAD_BALANCER_ID" ) // Config configuration for a provider, includes authentication token, project ID ID, and optional override URL to talk to a different Equinix Metal API endpoint @@ -55,6 +56,7 @@ type Config struct { APIServerPort int32 `json:"apiServerPort,omitempty"` BGPNodeSelector string `json:"bgpNodeSelector,omitempty"` EIPHealthCheckUseHostIP bool `json:"eipHealthCheckUseHostIP,omitempty"` + LoadBalancerID string `json:"loadBalancerID,omitempty"` } // String converts the Config structure to a string, while masking hidden fields. @@ -79,6 +81,7 @@ func (c Config) Strings() []string { ret = append(ret, fmt.Sprintf("Elastic IP Tag: '%s'", c.EIPTag)) ret = append(ret, fmt.Sprintf("API Server Port: '%d'", c.APIServerPort)) ret = append(ret, fmt.Sprintf("BGP Node Selector: '%s'", c.BGPNodeSelector)) + ret = append(ret, fmt.Sprintf("Load Balancer ID: '%s'", c.LoadBalancerID)) return ret } @@ -165,6 +168,8 @@ func getMetalConfig(providerConfig io.Reader) (Config, error) { config.EIPTag = override(os.Getenv(envVarEIPTag), rawConfig.EIPTag) + config.LoadBalancerID = override(os.Getenv(envVarLoadBalancerID), rawConfig.LoadBalancerID) + apiServer := os.Getenv(envVarAPIServerPort) switch { case apiServer != "": diff --git a/metal/controlplane_load_balancer_manager.go b/metal/controlplane_load_balancer_manager.go new file mode 100644 index 00000000..a20430c4 --- /dev/null +++ b/metal/controlplane_load_balancer_manager.go @@ -0,0 +1,217 @@ +package metal + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/packethost/packngo" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + v1applyconfig "k8s.io/client-go/applyconfigurations/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +type controlPlaneLoadBalancerManager struct { + apiServerPort int32 // node on which the external load balancer should listen + nodeAPIServerPort int32 // port on which the api server is listening on the control plane nodes + projectID string + loadBalancerID string + httpClient *http.Client + k8sclient kubernetes.Interface + assignmentMutex sync.Mutex + serviceMutex sync.Mutex + endpointsMutex sync.Mutex + controlPlaneSelectors []labels.Selector + useHostIP bool +} + +func newControlPlaneLoadBalancerManager(k8sclient kubernetes.Interface, stop <-chan struct{}, projectID string, loadBalancerID string, apiServerPort int32, useHostIP bool) (*controlPlaneLoadBalancerManager, error) { + klog.V(2).Info("newControlPlaneLoadBalancerManager()") + + if loadBalancerID == "" { + klog.Info("Load balancer ID is not configured, skipping control plane load balancer management") + return nil, nil + } + + m := &controlPlaneLoadBalancerManager{ + httpClient: &http.Client{ + Timeout: time.Second * 5, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + }, + apiServerPort: apiServerPort, + projectID: projectID, + loadBalancerID: loadBalancerID, + k8sclient: k8sclient, + useHostIP: useHostIP, + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-stop + cancel() + }() + + for _, label := range controlPlaneLabels { + req, err := labels.NewRequirement(label, selection.Exists, nil) + if err != nil { + return m, err + } + + m.controlPlaneSelectors = append(m.controlPlaneSelectors, labels.NewSelector().Add(*req)) + } + + sharedInformer := informers.NewSharedInformerFactory(k8sclient, checkLoopTimerSeconds*time.Second) + + if _, err := sharedInformer.Core().V1().Endpoints().Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + e, _ := obj.(*v1.Endpoints) + if e.Namespace != metav1.NamespaceDefault && e.Name != "kubernetes" { + return false + } + + return true + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + k8sEndpoints, _ := obj.(*v1.Endpoints) + klog.Infof("handling add, endpoints: %s/%s", k8sEndpoints.Namespace, k8sEndpoints.Name) + + if err := m.syncEndpoints(ctx, k8sEndpoints); err != nil { + klog.Errorf("failed to sync endpoints from default/kubernetes to %s/%s: %v", externalServiceNamespace, externalServiceName, err) + return + } + }, + UpdateFunc: func(_, obj interface{}) { + k8sEndpoints, _ := obj.(*v1.Endpoints) + klog.Infof("handling update, endpoints: %s/%s", k8sEndpoints.Namespace, k8sEndpoints.Name) + + if err := m.syncEndpoints(ctx, k8sEndpoints); err != nil { + klog.Errorf("failed to sync endpoints from default/kubernetes to %s/%s: %v", externalServiceNamespace, externalServiceName, err) + return + } + }, + }, + }, + ); err != nil { + return m, err + } + + if _, err := sharedInformer.Core().V1().Services().Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + s, _ := obj.(*v1.Service) + // Filter only service default/kubernetes + if s.Namespace == metav1.NamespaceDefault && s.Name == "kubernetes" { + return true + } + //else + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + k8sService, _ := obj.(*v1.Service) + klog.Infof("handling add, service: %s/%s", k8sService.Namespace, k8sService.Name) + + if err := m.syncService(ctx, k8sService); err != nil { + klog.Errorf("failed to sync service from default/kubernetes to %s/%s: %v", externalServiceNamespace, externalServiceName, err) + return + } + }, + UpdateFunc: func(_, obj interface{}) { + k8sService, _ := obj.(*v1.Service) + klog.Infof("handling update, service: %s/%s", k8sService.Namespace, k8sService.Name) + + if err := m.syncService(ctx, k8sService); err != nil { + klog.Errorf("failed to sync service from default/kubernetes to %s/%s: %v", externalServiceNamespace, externalServiceName, err) + return + } + }, + }, + }, + ); err != nil { + return m, err + } + + sharedInformer.Start(stop) + sharedInformer.WaitForCacheSync(stop) + + return m, nil +} + +func (m *controlPlaneLoadBalancerManager) healthURLFromControlPlaneLoadBalancer(controlPlaneLoadBalancer *packngo.IPAddressReservation) string { + return fmt.Sprintf("https://%s:%d/healthz", controlPlaneLoadBalancer.Address, m.apiServerPort) +} + +func (m *controlPlaneLoadBalancerManager) syncEndpoints(ctx context.Context, k8sEndpoints *v1.Endpoints) error { + m.endpointsMutex.Lock() + defer m.endpointsMutex.Unlock() + + applyConfig := v1applyconfig.Endpoints(externalServiceName, externalServiceNamespace) + for _, subset := range k8sEndpoints.Subsets { + applyConfig = applyConfig.WithSubsets(EndpointSubsetApplyConfig(subset)) + } + + if _, err := m.k8sclient.CoreV1().Endpoints(externalServiceNamespace).Apply( + ctx, + applyConfig, + metav1.ApplyOptions{FieldManager: emIdentifier}, + ); err != nil { + return fmt.Errorf("failed to apply endpoint %s/%s: %w", externalServiceNamespace, externalServiceName, err) + } + + return nil +} + +func (m *controlPlaneLoadBalancerManager) syncService(ctx context.Context, k8sService *v1.Service) error { + m.serviceMutex.Lock() + defer m.serviceMutex.Unlock() + + // get the target port + existingPorts := k8sService.Spec.Ports + if len(existingPorts) < 1 { + return errors.New("default/kubernetes service does not have any ports defined") + } + + // track which port the kube-apiserver actually is listening on + m.nodeAPIServerPort = existingPorts[0].TargetPort.IntVal + // did we set a specific port, or did we request that it just be left as is? + if m.apiServerPort == 0 { + m.apiServerPort = m.nodeAPIServerPort + } + + annotations := map[string]string{} + annotations["equinix.com/loadbalancerID"] = m.loadBalancerID + + specApplyConfig := v1applyconfig.ServiceSpec().WithType(v1.ServiceTypeLoadBalancer) + + for _, port := range existingPorts { + specApplyConfig = specApplyConfig.WithPorts(ServicePortApplyConfig(port)) + } + + applyConfig := v1applyconfig.Service(externalServiceName, externalServiceNamespace). + WithAnnotations(annotations). + WithSpec(specApplyConfig) + + if _, err := m.k8sclient.CoreV1().Services(externalServiceNamespace).Apply( + ctx, + applyConfig, + metav1.ApplyOptions{FieldManager: emIdentifier}, + ); err != nil { + return fmt.Errorf("failed to apply service %s/%s: %w", externalServiceNamespace, externalServiceName, err) + } + + return nil +} diff --git a/metal/loadbalancers.go b/metal/loadbalancers.go index e8fa0974..bd726039 100644 --- a/metal/loadbalancers.go +++ b/metal/loadbalancers.go @@ -174,8 +174,8 @@ func (l *loadBalancers) EnsureLoadBalancer(ctx context.Context, clusterName stri // TODO: Split out most of this to "reconcileLoadBalancer" // TODO: Split out status checking to a separate function that reconcileLoadBalancer calls - // handling is completely different if it is the control plane vs a regular service of type=LoadBalancer - if service.Name == externalServiceName && service.Namespace == externalServiceNamespace { + // For EIP-based (BGP) load balancers, handling is completely different if it is the control plane vs a regular service of type=LoadBalancer + if l.usesBGP && service.Name == externalServiceName && service.Namespace == externalServiceNamespace { ipCidr, err = l.retrieveIPByTag(ctx, service, l.eipTag) if err != nil { return nil, fmt.Errorf("failed to add service %s: %w", service.Name, err)