From 9bc203b00d41965ebc626d8beb8bf6d80469a7ff Mon Sep 17 00:00:00 2001 From: Tim Jones Date: Fri, 18 Oct 2024 21:44:53 +0200 Subject: [PATCH] feat: add support for metallb shared addresses Add support for shared IP addresses with MetalLB in the CRD configuration. Signed-off-by: Tim Jones --- README.md | 2 - metal/loadbalancers.go | 62 +++++++++-- metal/loadbalancers/metallb/configmap.go | 5 + metal/loadbalancers/metallb/cr.go | 133 +++++++++++++++++++---- metal/loadbalancers/metallb/cr_config.go | 15 +++ metal/loadbalancers/metallb/metallb.go | 13 ++- 6 files changed, 192 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index d1529579..3e0fe4ed 100644 --- a/README.md +++ b/README.md @@ -531,8 +531,6 @@ If `MetalLB` management is enabled, then CCM does the following. - If there is no other service, delete all CCM managed `bgpeers` and the default `bgpadvertisement` - delete the Elastic IP reservation from Equinix Metal -**NOTE:** [IP Address sharing](https://metallb.universe.tf/usage/#ip-address-sharing) is not yet supported in Cloud Provider Equinix Metal. - CCM itself does **not** install/deploy the load-balancer and it may exists before enable it. This can be deployed by the administrator separately, using the manifest provided in the releases page, or in any other manner. Not having metallb installed but enabled in the CCM configuration will end up allowing you to continue deploying kubernetes services, but the external ip assignment will remain pending, making it useless. In order to instruct metallb which IPs to announce and from where, CCM takes direct responsibility for managing the diff --git a/metal/loadbalancers.go b/metal/loadbalancers.go index 0ffe7ceb..4a58c91a 100644 --- a/metal/loadbalancers.go +++ b/metal/loadbalancers.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" + "golang.org/x/exp/slices" "sigs.k8s.io/cloud-provider-equinix-metal/metal/loadbalancers" "sigs.k8s.io/cloud-provider-equinix-metal/metal/loadbalancers/emlb" "sigs.k8s.io/cloud-provider-equinix-metal/metal/loadbalancers/empty" @@ -286,17 +287,32 @@ func (l *loadBalancers) EnsureLoadBalancerDeleted(ctx context.Context, clusterNa klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: no IP reservation found for %s, nothing to delete", svcName) return nil } + + // remove it from any implementation-specific parts + svcIPCidr = fmt.Sprintf("%s/%d", ipReservation.GetAddress(), ipReservation.GetCidr()) + klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s entry %s", svcName, svcIPCidr) + + if err := l.implementor.RemoveService(ctx, service.Namespace, service.Name, svcIPCidr, service); err != nil { + if errors.Is(err, metallb.ErrIPStillInUse) { + // IP is still in use by another service, just remove this service tag + klog.V(2).Info("EnsureLoadBalancerDeleted(): remove: not removing IP, still in use") + tags := slices.DeleteFunc(ipReservation.GetTags(), func(s string) bool { + return s == svcTag + }) + if _, _, err = l.client.IPAddressesApi.UpdateIPAddress(context.Background(), ipReservation.GetId()).IPAssignmentUpdateInput(metal.IPAssignmentUpdateInput{Tags: tags}).Execute(); err != nil { + return fmt.Errorf("failed to update IP removing old service tag: %w", err) + } + return nil + } + return fmt.Errorf("error removing IP %s: %w", ipReservation.GetAddress(), err) + } + // delete the reservation klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s EIP ID %s", svcName, ipReservation.GetId()) if _, err := l.client.IPAddressesApi.DeleteIPAddress(context.Background(), ipReservation.GetId()).Execute(); err != nil { return fmt.Errorf("failed to remove IP address reservation %s from project: %w", ipReservation.GetAddress(), err) } - // remove it from any implementation-specific parts - svcIPCidr = fmt.Sprintf("%s/%d", ipReservation.GetAddress(), ipReservation.GetCidr()) - klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s entry %s", svcName, svcIPCidr) - } - - if err := l.implementor.RemoveService(ctx, service.Namespace, service.Name, svcIPCidr, service); err != nil { + } else if err := l.implementor.RemoveService(ctx, service.Namespace, service.Name, svcIPCidr, service); err != nil { return fmt.Errorf("error removing IP from configmap for %s: %w", svcName, err) } klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: removed service %s from implementation", svcName) @@ -408,6 +424,8 @@ func (l *loadBalancers) addService(ctx context.Context, svc *v1.Service, nodes [ n []loadbalancers.Node ips *metal.IPReservationList ) + // our default CIDR for each address is 32 + cidr := int32(32) if l.usesBGP { // get IP address reservations and check if they any exists for this svc @@ -501,14 +519,36 @@ func (l *loadBalancers) addService(ctx context.Context, svc *v1.Service, nodes [ } klog.V(2).Infof("successfully assigned %s update service %s", svcIP, svcName) } - // our default CIDR for each address is 32 - cidr := int32(32) if ipReservation != nil { cidr = ipReservation.GetCidr() } - svcIPCidr = fmt.Sprintf("%s/%d", svcIP, cidr) - // now need to pass it the nodes + } + + svcIPCidr = fmt.Sprintf("%s/%d", svcIP, cidr) + if err = l.implementor.AddService(ctx, svc.Namespace, svc.Name, svcIPCidr, n, svc, nodes, loadBalancerName); err != nil { + return svcIPCidr, err + } + + if l.usesBGP { + // Need to ensure the service tag is on the IP for shared IP Services + klog.V(2).Infof("service tag %s not found on IP %s, adding", svcTag, svcIP) + ips, _, err := l.client.IPAddressesApi.FindIPReservations(context.Background(), l.project).Execute() + if err != nil { + return svcIPCidr, fmt.Errorf("failed to list project IPs: %w", err) + } + for _, ip := range ips.GetIpAddresses() { + if *ip.IPReservation.Address == svcIP { + if !slices.Contains(ip.IPReservation.Tags, svcTag) { + tags := append(ip.IPReservation.Tags, svcTag) + if _, _, err = l.client.IPAddressesApi.UpdateIPAddress(context.Background(), *ip.IPReservation.Id).IPAssignmentUpdateInput(metal.IPAssignmentUpdateInput{Tags: tags}).Execute(); err != nil { + return svcIPCidr, fmt.Errorf("failed to update IP with new service tag: %w", err) + } + } + break + } + } + // now need to pass it the nodes for _, node := range nodes { // get the node provider ID id := node.Spec.ProviderID @@ -543,7 +583,7 @@ func (l *loadBalancers) addService(ctx context.Context, svc *v1.Service, nodes [ } } - return svcIPCidr, l.implementor.AddService(ctx, svc.Namespace, svc.Name, svcIPCidr, n, svc, nodes, loadBalancerName) + return svcIPCidr, nil } func (l *loadBalancers) retrieveIPByTag(ctx context.Context, svc *v1.Service, tag string) (string, error) { diff --git a/metal/loadbalancers/metallb/configmap.go b/metal/loadbalancers/metallb/configmap.go index 35620136..6f7caa45 100644 --- a/metal/loadbalancers/metallb/configmap.go +++ b/metal/loadbalancers/metallb/configmap.go @@ -188,5 +188,10 @@ func (m *CMConfigurer) RemoveAddressPoolByAddress(ctx context.Context, addr stri return nil } +// RemoveFromAddressPool remove service from a pool by name. If the matching pool is not found, do not change anything +func (m *CMConfigurer) RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error { + return nil +} + // RemoveAddressPool remove a pool by name. If the matching pool does not exist, do not change anything func (m *CMConfigurer) RemoveAddressPool(ctx context.Context, pool string) error { return nil } diff --git a/metal/loadbalancers/metallb/cr.go b/metal/loadbalancers/metallb/cr.go index 07d2bc1c..323e7521 100644 --- a/metal/loadbalancers/metallb/cr.go +++ b/metal/loadbalancers/metallb/cr.go @@ -6,18 +6,22 @@ import ( "strings" metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" + corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( - defaultBgpAdvertisement = "equinix-metal-bgp-adv" - cpemLabelKey = "cloud-provider" - cpemLabelValue = "equinix-metal" - svcLabelKeyPrefix = "service-" - svcLabelValuePrefix = "namespace-" + defaultBgpAdvertisement = "equinix-metal-bgp-adv" + cpemLabelKey = "cloud-provider" + cpemLabelValue = "equinix-metal" + svcLabelKeyPrefix = "service-" + svcLabelValuePrefix = "namespace-" + svcAnnotationSharedPrefix = "shared-" + metallbAnnotationSharedIP = "metallb.universe.tf/allow-shared-ip" // Not exported as a const from metallb package :( ) type CRDConfigurer struct { @@ -149,36 +153,62 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv addIPAddr := convertToIPAddr(*add, m.namespace, svcNamespace, svcName) + svc := corev1.Service{} + if err = m.client.Get(ctx, client.ObjectKey{Namespace: svcNamespace, Name: svcName}, &svc); err != nil { + return false, fmt.Errorf("unable to retrieve service: %w", err) + } + // go through the pools and see if we have one that matches // - if same service name return false - // - // TODO (ocobleseqx) - // - Metallb allows ip address sharing for services, so we need to find a way to share a pool - // EnsureLoadBalancerDeleted filters ips by service tags, so when ip is specified and already exists - // it must be updted to include the new serviceNamespace/service for _, o := range olds.Items { - var updateLabels, updateAddresses bool + var updateLabels, updateAddresses, updateAnnotations bool // if same name check services labels if o.GetName() == addIPAddr.GetName() { - for k := range o.GetLabels() { - if strings.HasPrefix(k, svcLabelKeyPrefix) { - osvc := strings.TrimPrefix(k, svcLabelKeyPrefix) - if osvc == svcName { - // already exists + // if service label and key matches + klog.V(2).Info("found matching address pool") + if o.Labels[serviceLabelKey(svcName)] == serviceLabelValue(svcNamespace) { + // if is shared and service exsits in shared annotation + if k, ok := svc.Annotations[metallbAnnotationSharedIP]; ok { + if containsSharedService(o.Annotations[sharedAnnotationKey(k)], svcNamespace, svcName) { + // already exists, and in shared annotation return false, nil + } else { + updateAnnotations = true } + } else { + // already exists, and not shared + return false, nil } } // if we got here, none matched exactly, update labels updateLabels = true } - for _, addr := range addIPAddr.Spec.Addresses { - if slices.Contains(o.Spec.Addresses, addr) { - updateAddresses = true - break + + // If we already need to update the annotations, then this is the owning service and it's just adding a shared-ip annotation + if !updateAnnotations { + // Otherwise we need to check that the IP is new or can be shared + for _, addr := range addIPAddr.Spec.Addresses { + if slices.Contains(o.Spec.Addresses, addr) { + klog.V(2).Info("found matching ip in other address pool, checking if it can be shared") + // Check the Service is configured to share the IP + sharedIpKey, ok := svc.Annotations[metallbAnnotationSharedIP] + if !ok { + return false, fmt.Errorf("unable to configure IPAddressPool: requested ip %s already in use and no %s annotation found", addr, metallbAnnotationSharedIP) + } + + // Check the shared IP key matches the pool annotation + if _, ok := o.Annotations[sharedAnnotationKey(sharedIpKey)]; !ok { + return false, fmt.Errorf("unable to configure IPAddressPool: requested ip %s already in use and %s annotation does not match", addr, metallbAnnotationSharedIP) + } + + updateAnnotations = true + updateAddresses = true + break + } } } - if updateLabels || updateAddresses { + + if updateLabels || updateAddresses || updateAnnotations { // update pool patch := client.MergeFrom(o.DeepCopy()) if updateLabels { @@ -189,7 +219,19 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv addresses := append(o.Spec.Addresses, addIPAddr.Spec.Addresses...) slices.Sort(addresses) o.Spec.Addresses = slices.Compact(addresses) - o.Spec.Addresses = addresses + } + if updateAnnotations { + sharedIpKey := sharedAnnotationKey(svc.Annotations[metallbAnnotationSharedIP]) + if sharedSvcs, ok := o.Annotations[sharedIpKey]; !ok { + // Safer way to set annotations in case the annotation map itself is nil + o.SetAnnotations(map[string]string{sharedIpKey: sharedServiceName(svcNamespace, svcName)}) + } else { + sharedSvcs := strings.Split(sharedSvcs, ",") + sharedSvcs = append(sharedSvcs, sharedServiceName(svcNamespace, svcName)) + slices.Sort(sharedSvcs) + sharedSvcs = slices.Compact(sharedSvcs) + o.Annotations[sharedIpKey] = strings.Join(sharedSvcs, ",") + } } err := m.client.Patch(ctx, &o, patch) if err != nil { @@ -236,6 +278,53 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv return true, nil } +// RemoveFromAddressPool removes a service from a pool by name. If the matching pool is not found, do not change anything +func (m *CRDConfigurer) RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error { + if svcNamespace == "" || svcName == "" { + return nil + } + + olds, err := m.listIPAddressPools(ctx) + if err != nil { + return err + } + + // go through the pools and see if we have a match + pool := poolName(svcNamespace, svcName) + for _, o := range olds.Items { + if slices.ContainsFunc(maps.Keys(o.GetAnnotations()), func(s string) bool { + return strings.HasPrefix(s, svcAnnotationSharedPrefix) && containsSharedService(o.Annotations[s], svcNamespace, svcName) + }) { + // If there are more services sharing this pool, we only need to remove this service from the annotation + for k, v := range o.GetAnnotations() { + if strings.HasPrefix(k, svcAnnotationSharedPrefix) && containsSharedService(v, svcNamespace, svcName) { + svcList := slices.DeleteFunc(strings.Split(v, ","), func(s string) bool { + return s == sharedServiceName(svcNamespace, svcName) + }) + if len(svcList) == 0 { + // No other shared services with this key + return m.RemoveAddressPool(ctx, o.GetName()) + } else { + patch := client.MergeFrom(o.DeepCopy()) + delete(o.Labels, serviceLabelKey(svcName)) + o.Annotations[k] = strings.Join(svcList, ",") + if err = m.client.Patch(ctx, &o, patch); err != nil { + return fmt.Errorf("unable to update IPAddressPool %s: %w", o.GetName(), err) + } + // Other Services still use this IP + return ErrIPStillInUse + } + + } + } + } else if o.GetName() == pool { + // Not shared, so just delete the pool + return m.RemoveAddressPool(ctx, pool) + } + } + return nil +} + // RemoveAddressPool removes a pool by name. If the matching pool does not exist, do not change anything func (m *CRDConfigurer) RemoveAddressPool(ctx context.Context, pool string) error { if pool == "" { diff --git a/metal/loadbalancers/metallb/cr_config.go b/metal/loadbalancers/metallb/cr_config.go index 107101bf..df258167 100644 --- a/metal/loadbalancers/metallb/cr_config.go +++ b/metal/loadbalancers/metallb/cr_config.go @@ -3,7 +3,9 @@ package metallb import ( "fmt" "regexp" + "slices" "sort" + "strings" "time" metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" @@ -24,6 +26,19 @@ func serviceLabelValue(svcNamespace string) string { return svcLabelValuePrefix + svcNamespace } +func sharedAnnotationKey(sharedKey string) string { + return svcAnnotationSharedPrefix + sharedKey +} + +func sharedServiceName(svcNamespace, svcName string) string { + return fmt.Sprintf("%s.%s", svcNamespace, svcName) +} + +func containsSharedService(poolAnnotationValue, svcNamespace, svcName string) bool { + svcList := strings.Split(poolAnnotationValue, ",") + return slices.Contains(svcList, sharedServiceName(svcNamespace, svcName)) +} + func convertToIPAddr(addr AddressPool, namespace, svcNamespace, svcName string) metallbv1beta1.IPAddressPool { ip := metallbv1beta1.IPAddressPool{ Spec: metallbv1beta1.IPAddressPoolSpec{ diff --git a/metal/loadbalancers/metallb/metallb.go b/metal/loadbalancers/metallb/metallb.go index b093b736..6b4e521a 100644 --- a/metal/loadbalancers/metallb/metallb.go +++ b/metal/loadbalancers/metallb/metallb.go @@ -2,6 +2,7 @@ package metallb import ( "context" + "errors" "fmt" "net/url" "strconv" @@ -44,6 +45,9 @@ type Configurer interface { // Returns if anything changed AddAddressPool(ctx context.Context, add *AddressPool, svcNamespace, svcName string) (bool, error) + // RemoveFromAddressPool remove service from a pool by name. If the matching pool if not found, do not change anything + RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error + // RemoveAddressPool remove a pool by name. If the matching pool does not exist, do not change anything RemoveAddressPool(ctx context.Context, pool string) error @@ -62,6 +66,8 @@ type LB struct { var ( _ loadbalancers.LB = (*LB)(nil) crdConfiguration = false + + ErrIPStillInUse = errors.New("ip address still in use") ) // func NewLB(k8sclient kubernetes.Interface, k8sApiextensionsClientset *k8sapiextensionsclient.Clientset, config string) *LB { @@ -102,6 +108,7 @@ func NewLB(k8sclient kubernetes.Interface, config string, featureFlags url.Value if crdConfiguration { scheme := runtime.NewScheme() _ = metallbv1beta1.AddToScheme(scheme) + _ = v1.AddToScheme(scheme) cl, err := client.New(clientconfig.GetConfigOrDie(), client.Options{Scheme: scheme}) if err != nil { panic(err) @@ -267,9 +274,9 @@ func updateIP(ctx context.Context, config Configurer, addr, svcNamespace, svcNam return fmt.Errorf("error removing IP: %w", err) } } else { - if err := config.RemoveAddressPool(ctx, name); err != nil { - klog.V(2).Infof("error removing IPAddressPool: %v", err) - return fmt.Errorf("error removing IPAddressPool: %w", err) + if err := config.RemoveFromAddressPool(ctx, svcNamespace, svcName); err != nil { + klog.V(2).Infof("error removing from IPAddressPool: %v", err) + return fmt.Errorf("error removing from IPAddressPool: %w", err) } } }