Skip to content

Commit

Permalink
feat: add control plane load balancing with LBaaS
Browse files Browse the repository at this point in the history
- Allow the control plane LoadBalancer service to be managed
  by the provider if `usesBGP` is false for the selected load
  balancer implementation
- Add a reconciler for LBaaS-based control plane LoadBalancer
- Add a `LoadBalancerID` config option to specify the ID of the
  load balancer to use for the control plane
  • Loading branch information
ctreatma committed Oct 23, 2023
1 parent 05be98e commit 35bed9c
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 7 deletions.
16 changes: 11 additions & 5 deletions metal/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ const (

// cloud implements cloudprovider.Interface
type cloud struct {
client *packngo.Client
config Config
instances *instances
loadBalancer *loadBalancers
controlPlaneEndpointManager *controlPlaneEndpointManager
client *packngo.Client
config Config
instances *instances
loadBalancer *loadBalancers
controlPlaneEndpointManager *controlPlaneEndpointManager
controlPlaneLoadBalancerManager *controlPlaneLoadBalancerManager
// holds our bgp service handler
bgp *bgp
}
Expand Down Expand Up @@ -80,6 +81,10 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
if err != nil {
klog.Fatalf("could not initialize ControlPlaneEndpointManager: %v", err)
}
lbm, err := newControlPlaneLoadBalancerManager(clientset, stop, c.config.ProjectID, c.config.LoadBalancerID, c.config.APIServerPort, c.config.EIPHealthCheckUseHostIP)
if err != nil {
klog.Fatalf("could not initialize ControlPlaneEndpointManager: %v", err)
}
bgp, err := newBGP(c.client, clientset, c.config)
if err != nil {
klog.Fatalf("could not initialize BGP: %v", err)
Expand All @@ -93,6 +98,7 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
c.bgp = bgp
c.instances = newInstances(c.client, c.config.ProjectID)
c.controlPlaneEndpointManager = epm
c.controlPlaneLoadBalancerManager = lbm

klog.Info("Initialize of cloud provider complete")
}
Expand Down
5 changes: 5 additions & 0 deletions metal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
envVarAPIServerPort = "METAL_API_SERVER_PORT"
envVarBGPNodeSelector = "METAL_BGP_NODE_SELECTOR"
envVarEIPHealthCheckUseHostIP = "METAL_EIP_HEALTH_CHECK_USE_HOST_IP"
envVarLoadBalancerID = "METAL_LOAD_BALANCER_ID"
)

// Config configuration for a provider, includes authentication token, project ID ID, and optional override URL to talk to a different Equinix Metal API endpoint
Expand All @@ -55,6 +56,7 @@ type Config struct {
APIServerPort int32 `json:"apiServerPort,omitempty"`
BGPNodeSelector string `json:"bgpNodeSelector,omitempty"`
EIPHealthCheckUseHostIP bool `json:"eipHealthCheckUseHostIP,omitempty"`
LoadBalancerID string `json:"loadBalancerID,omitempty"`
}

// String converts the Config structure to a string, while masking hidden fields.
Expand All @@ -79,6 +81,7 @@ func (c Config) Strings() []string {
ret = append(ret, fmt.Sprintf("Elastic IP Tag: '%s'", c.EIPTag))
ret = append(ret, fmt.Sprintf("API Server Port: '%d'", c.APIServerPort))
ret = append(ret, fmt.Sprintf("BGP Node Selector: '%s'", c.BGPNodeSelector))
ret = append(ret, fmt.Sprintf("Load Balancer ID: '%s'", c.LoadBalancerID))

return ret
}
Expand Down Expand Up @@ -165,6 +168,8 @@ func getMetalConfig(providerConfig io.Reader) (Config, error) {

config.EIPTag = override(os.Getenv(envVarEIPTag), rawConfig.EIPTag)

config.LoadBalancerID = override(os.Getenv(envVarLoadBalancerID), rawConfig.LoadBalancerID)

apiServer := os.Getenv(envVarAPIServerPort)
switch {
case apiServer != "":
Expand Down
217 changes: 217 additions & 0 deletions metal/controlplane_load_balancer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package metal

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/packethost/packngo"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
v1applyconfig "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

type controlPlaneLoadBalancerManager struct {
apiServerPort int32 // node on which the external load balancer should listen
nodeAPIServerPort int32 // port on which the api server is listening on the control plane nodes
projectID string
loadBalancerID string
httpClient *http.Client
k8sclient kubernetes.Interface
assignmentMutex sync.Mutex

Check failure on line 31 in metal/controlplane_load_balancer_manager.go

View workflow job for this annotation

GitHub Actions / Lint

field `assignmentMutex` is unused (unused)
serviceMutex sync.Mutex
endpointsMutex sync.Mutex
controlPlaneSelectors []labels.Selector
useHostIP bool
}

func newControlPlaneLoadBalancerManager(k8sclient kubernetes.Interface, stop <-chan struct{}, projectID string, loadBalancerID string, apiServerPort int32, useHostIP bool) (*controlPlaneLoadBalancerManager, error) {
klog.V(2).Info("newControlPlaneLoadBalancerManager()")

if loadBalancerID == "" {
klog.Info("Load balancer ID is not configured, skipping control plane load balancer management")
return nil, nil
}

m := &controlPlaneLoadBalancerManager{
httpClient: &http.Client{
Timeout: time.Second * 5,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
apiServerPort: apiServerPort,
projectID: projectID,
loadBalancerID: loadBalancerID,
k8sclient: k8sclient,
useHostIP: useHostIP,
}

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stop
cancel()
}()

for _, label := range controlPlaneLabels {
req, err := labels.NewRequirement(label, selection.Exists, nil)
if err != nil {
return m, err
}

m.controlPlaneSelectors = append(m.controlPlaneSelectors, labels.NewSelector().Add(*req))
}

sharedInformer := informers.NewSharedInformerFactory(k8sclient, checkLoopTimerSeconds*time.Second)

if _, err := sharedInformer.Core().V1().Endpoints().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
e, _ := obj.(*v1.Endpoints)
if e.Namespace != metav1.NamespaceDefault && e.Name != "kubernetes" {
return false
}

return true
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
k8sEndpoints, _ := obj.(*v1.Endpoints)
klog.Infof("handling add, endpoints: %s/%s", k8sEndpoints.Namespace, k8sEndpoints.Name)

if err := m.syncEndpoints(ctx, k8sEndpoints); err != nil {
klog.Errorf("failed to sync endpoints from default/kubernetes to %s/%s: %v", externalServiceNamespace, externalServiceName, err)
return
}
},
UpdateFunc: func(_, obj interface{}) {
k8sEndpoints, _ := obj.(*v1.Endpoints)
klog.Infof("handling update, endpoints: %s/%s", k8sEndpoints.Namespace, k8sEndpoints.Name)

if err := m.syncEndpoints(ctx, k8sEndpoints); err != nil {
klog.Errorf("failed to sync endpoints from default/kubernetes to %s/%s: %v", externalServiceNamespace, externalServiceName, err)
return
}
},
},
},
); err != nil {
return m, err
}

if _, err := sharedInformer.Core().V1().Services().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
s, _ := obj.(*v1.Service)
// Filter only service default/kubernetes
if s.Namespace == metav1.NamespaceDefault && s.Name == "kubernetes" {
return true
}
//else
return false
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
k8sService, _ := obj.(*v1.Service)
klog.Infof("handling add, service: %s/%s", k8sService.Namespace, k8sService.Name)

if err := m.syncService(ctx, k8sService); err != nil {
klog.Errorf("failed to sync service from default/kubernetes to %s/%s: %v", externalServiceNamespace, externalServiceName, err)
return
}
},
UpdateFunc: func(_, obj interface{}) {
k8sService, _ := obj.(*v1.Service)
klog.Infof("handling update, service: %s/%s", k8sService.Namespace, k8sService.Name)

if err := m.syncService(ctx, k8sService); err != nil {
klog.Errorf("failed to sync service from default/kubernetes to %s/%s: %v", externalServiceNamespace, externalServiceName, err)
return
}
},
},
},
); err != nil {
return m, err
}

sharedInformer.Start(stop)
sharedInformer.WaitForCacheSync(stop)

return m, nil
}

func (m *controlPlaneLoadBalancerManager) healthURLFromControlPlaneLoadBalancer(controlPlaneLoadBalancer *packngo.IPAddressReservation) string {

Check failure on line 154 in metal/controlplane_load_balancer_manager.go

View workflow job for this annotation

GitHub Actions / Lint

func `(*controlPlaneLoadBalancerManager).healthURLFromControlPlaneLoadBalancer` is unused (unused)
return fmt.Sprintf("https://%s:%d/healthz", controlPlaneLoadBalancer.Address, m.apiServerPort)
}

func (m *controlPlaneLoadBalancerManager) syncEndpoints(ctx context.Context, k8sEndpoints *v1.Endpoints) error {
m.endpointsMutex.Lock()
defer m.endpointsMutex.Unlock()

applyConfig := v1applyconfig.Endpoints(externalServiceName, externalServiceNamespace)
for _, subset := range k8sEndpoints.Subsets {
applyConfig = applyConfig.WithSubsets(EndpointSubsetApplyConfig(subset))
}

if _, err := m.k8sclient.CoreV1().Endpoints(externalServiceNamespace).Apply(
ctx,
applyConfig,
metav1.ApplyOptions{FieldManager: emIdentifier},
); err != nil {
return fmt.Errorf("failed to apply endpoint %s/%s: %w", externalServiceNamespace, externalServiceName, err)
}

return nil
}

func (m *controlPlaneLoadBalancerManager) syncService(ctx context.Context, k8sService *v1.Service) error {
m.serviceMutex.Lock()
defer m.serviceMutex.Unlock()

// get the target port
existingPorts := k8sService.Spec.Ports
if len(existingPorts) < 1 {
return errors.New("default/kubernetes service does not have any ports defined")
}

// track which port the kube-apiserver actually is listening on
m.nodeAPIServerPort = existingPorts[0].TargetPort.IntVal
// did we set a specific port, or did we request that it just be left as is?
if m.apiServerPort == 0 {
m.apiServerPort = m.nodeAPIServerPort
}

annotations := map[string]string{}
annotations["equinix.com/loadbalancerID"] = m.loadBalancerID

specApplyConfig := v1applyconfig.ServiceSpec().WithType(v1.ServiceTypeLoadBalancer)

for _, port := range existingPorts {
specApplyConfig = specApplyConfig.WithPorts(ServicePortApplyConfig(port))
}

applyConfig := v1applyconfig.Service(externalServiceName, externalServiceNamespace).
WithAnnotations(annotations).
WithSpec(specApplyConfig)

if _, err := m.k8sclient.CoreV1().Services(externalServiceNamespace).Apply(
ctx,
applyConfig,
metav1.ApplyOptions{FieldManager: emIdentifier},
); err != nil {
return fmt.Errorf("failed to apply service %s/%s: %w", externalServiceNamespace, externalServiceName, err)
}

return nil
}
4 changes: 2 additions & 2 deletions metal/loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (l *loadBalancers) EnsureLoadBalancer(ctx context.Context, clusterName stri
// TODO: Split out most of this to "reconcileLoadBalancer"
// TODO: Split out status checking to a separate function that reconcileLoadBalancer calls

// handling is completely different if it is the control plane vs a regular service of type=LoadBalancer
if service.Name == externalServiceName && service.Namespace == externalServiceNamespace {
// For EIP-based (BGP) load balancers, handling is completely different if it is the control plane vs a regular service of type=LoadBalancer
if l.usesBGP && service.Name == externalServiceName && service.Namespace == externalServiceNamespace {
ipCidr, err = l.retrieveIPByTag(ctx, service, l.eipTag)
if err != nil {
return nil, fmt.Errorf("failed to add service %s: %w", service.Name, err)
Expand Down

0 comments on commit 35bed9c

Please sign in to comment.