From 78005b1614e467ea5fd52b9de02beff53ef8ee5e Mon Sep 17 00:00:00 2001 From: Tim Jones Date: Fri, 18 Oct 2024 21:44:53 +0200 Subject: [PATCH] feat: add support for metallb shared addresses Add support for shared IP addresses with MetalLB in the CRD configuration. Signed-off-by: Tim Jones --- README.md | 2 - metal/loadbalancers.go | 55 +++++++-- metal/loadbalancers/metallb/configmap.go | 5 + metal/loadbalancers/metallb/cr.go | 139 +++++++++++++++++++---- metal/loadbalancers/metallb/cr_config.go | 15 +++ metal/loadbalancers/metallb/metallb.go | 14 ++- 6 files changed, 194 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 0078fbd7..8bb76a74 100644 --- a/README.md +++ b/README.md @@ -518,8 +518,6 @@ If `MetalLB` management is enabled, then CCM does the following. * If there is no other service, delete all CCM managed `bgpeers` and the default `bgpadvertisement` * delete the Elastic IP reservation from Equinix Metal -**NOTE:** (IP Address sharing)[https://metallb.universe.tf/usage/#ip-address-sharing] is not yet supported in Cloud Provider Equinix Metal. - CCM itself does **not** install/deploy the load-balancer and it may exists before enable it. This can be deployed by the administrator separately, using the manifest provided in the releases page, or in any other manner. Not having metallb installed but enabled in the CCM configuration will end up allowing you to continue deploying kubernetes services, but the external ip assignment will remain pending, making it useless. In order to instruct metallb which IPs to announce and from where, CCM takes direct responsibility for managing the diff --git a/metal/loadbalancers.go b/metal/loadbalancers.go index 4673adf6..6d6b303a 100644 --- a/metal/loadbalancers.go +++ b/metal/loadbalancers.go @@ -12,6 +12,8 @@ import ( "strconv" "strings" + "golang.org/x/exp/slices" + "github.com/equinix/cloud-provider-equinix-metal/metal/loadbalancers" "github.com/equinix/cloud-provider-equinix-metal/metal/loadbalancers/empty" "github.com/equinix/cloud-provider-equinix-metal/metal/loadbalancers/kubevip" @@ -255,16 +257,32 @@ func (l *loadBalancers) EnsureLoadBalancerDeleted(ctx context.Context, clusterNa klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: no IP reservation found for %s, nothing to delete", svcName) return nil } - // delete the reservation - klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s EIP ID %s", svcName, ipReservation.ID) - if _, err := l.client.ProjectIPs.Remove(ipReservation.ID); err != nil { - return fmt.Errorf("failed to remove IP address reservation %s from project: %w", ipReservation.String(), err) - } // remove it from any implementation-specific parts svcIPCidr = fmt.Sprintf("%s/%d", ipReservation.Address, ipReservation.CIDR) klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s entry %s", svcName, svcIPCidr) if err := l.implementor.RemoveService(ctx, service.Namespace, service.Name, svcIPCidr); err != nil { - return fmt.Errorf("error removing IP from configmap for %s: %w", svcName, err) + if errors.Is(err, metallb.ErrIPStillInUse) { + // IP is still in use by another service, just remove this service tag + klog.V(2).Info("EnsureLoadBalancerDeleted(): remove: not removing IP, still in use") + // TODO: Update go and update to use a simple DeleteFunc: + // tags := slices.DeleteFunc(ipReservation.Tags, func(s string) bool { + // return s == svcTag + // } + idx := slices.Index(ipReservation.Tags, svcTag) + tags := slices.Delete(ipReservation.Tags, idx, idx+1) + update := packngo.IPAddressUpdateRequest{Tags: &tags} + if _, _, err = l.client.ProjectIPs.Update(ipReservation.ID, &update, &packngo.GetOptions{}); err != nil { + return fmt.Errorf("failed to update IP removing old service tag: %w", err) + } + return nil + } else { + return fmt.Errorf("error removing IP from configmap for %s: %w", svcName, err) + } + } + // delete the reservation + klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s EIP ID %s", svcName, ipReservation.ID) + if _, err := l.client.ProjectIPs.Remove(ipReservation.ID); err != nil { + return fmt.Errorf("failed to remove IP address reservation %s from project: %w", ipReservation.String(), err) } klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: removed service %s from implementation", svcName) return nil @@ -493,7 +511,30 @@ func (l *loadBalancers) addService(ctx context.Context, svc *v1.Service, ips []p }) } - return svcIPCidr, l.implementor.AddService(ctx, svc.Namespace, svc.Name, svcIPCidr, n) + if err = l.implementor.AddService(ctx, svc.Namespace, svc.Name, svcIPCidr, n); err != nil { + return svcIPCidr, err + } + + if ipReservation == nil { + // Need to ensure the service tag is on the IP for shared IP Services + klog.V(2).Infof("service tag %s not found on IP %s, adding", svcTag, svcIP) + ips, _, err := l.client.ProjectIPs.List(l.project, &packngo.ListOptions{}) + if err != nil { + return svcIPCidr, fmt.Errorf("failed to list project IPs: %w", err) + } + for _, ip := range ips { + if ip.Address == svcIP && ip.CIDR == cidr { + tags := append(ip.Tags, svcTag) + update := packngo.IPAddressUpdateRequest{Tags: &tags} + if _, _, err = l.client.ProjectIPs.Update(ip.ID, &update, &packngo.GetOptions{}); err != nil { + return svcIPCidr, fmt.Errorf("failed to update IP with new service tag: %w", err) + } + break + } + } + } + + return svcIPCidr, nil } func (l *loadBalancers) retrieveIPByTag(ctx context.Context, svc *v1.Service, ips []packngo.IPAddressReservation, tag string) (string, error) { diff --git a/metal/loadbalancers/metallb/configmap.go b/metal/loadbalancers/metallb/configmap.go index 35620136..6f7caa45 100644 --- a/metal/loadbalancers/metallb/configmap.go +++ b/metal/loadbalancers/metallb/configmap.go @@ -188,5 +188,10 @@ func (m *CMConfigurer) RemoveAddressPoolByAddress(ctx context.Context, addr stri return nil } +// RemoveFromAddressPool remove service from a pool by name. If the matching pool is not found, do not change anything +func (m *CMConfigurer) RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error { + return nil +} + // RemoveAddressPool remove a pool by name. If the matching pool does not exist, do not change anything func (m *CMConfigurer) RemoveAddressPool(ctx context.Context, pool string) error { return nil } diff --git a/metal/loadbalancers/metallb/cr.go b/metal/loadbalancers/metallb/cr.go index 07d2bc1c..ac1ecb94 100644 --- a/metal/loadbalancers/metallb/cr.go +++ b/metal/loadbalancers/metallb/cr.go @@ -6,18 +6,22 @@ import ( "strings" metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" + corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( - defaultBgpAdvertisement = "equinix-metal-bgp-adv" - cpemLabelKey = "cloud-provider" - cpemLabelValue = "equinix-metal" - svcLabelKeyPrefix = "service-" - svcLabelValuePrefix = "namespace-" + defaultBgpAdvertisement = "equinix-metal-bgp-adv" + cpemLabelKey = "cloud-provider" + cpemLabelValue = "equinix-metal" + svcLabelKeyPrefix = "service-" + svcLabelValuePrefix = "namespace-" + svcAnnotationSharedPrefix = "shared-" + metallbAnnotationSharedIP = "metallb.universe.tf/allow-shared-ip" // Not exported as a const from metallb package :( ) type CRDConfigurer struct { @@ -149,36 +153,60 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv addIPAddr := convertToIPAddr(*add, m.namespace, svcNamespace, svcName) + svc := corev1.Service{} + if err = m.client.Get(ctx, client.ObjectKey{Namespace: svcNamespace, Name: svcName}, &svc); err != nil { + return false, fmt.Errorf("unable to retrieve service: %w", err) + } + // go through the pools and see if we have one that matches // - if same service name return false - // - // TODO (ocobleseqx) - // - Metallb allows ip address sharing for services, so we need to find a way to share a pool - // EnsureLoadBalancerDeleted filters ips by service tags, so when ip is specified and already exists - // it must be updted to include the new serviceNamespace/service for _, o := range olds.Items { - var updateLabels, updateAddresses bool + var updateLabels, updateAddresses, updateAnnotations bool // if same name check services labels if o.GetName() == addIPAddr.GetName() { - for k := range o.GetLabels() { - if strings.HasPrefix(k, svcLabelKeyPrefix) { - osvc := strings.TrimPrefix(k, svcLabelKeyPrefix) - if osvc == svcName { - // already exists + // if service label and key matches + if o.Labels[serviceLabelKey(svcName)] == serviceLabelValue(svcNamespace) { + // if is shared and service exsits in shared annotation + if k, ok := svc.Annotations[metallbAnnotationSharedIP]; ok { + if containsSharedService(o.Annotations[sharedAnnotationKey(k)], svcNamespace, svcName) { + // already exists, and in shared annotation return false, nil + } else { + updateAnnotations = true } + } else { + // already exists, and not shared + return false, nil } + // if we got here, none matched exactly, update labels + updateLabels = true } - // if we got here, none matched exactly, update labels - updateLabels = true } - for _, addr := range addIPAddr.Spec.Addresses { - if slices.Contains(o.Spec.Addresses, addr) { - updateAddresses = true - break + + // If we already need to update the annotations, then this is the owning service and it's just adding a shared-ip annotation + if !updateAnnotations { + // Otherwise we need to check that the IP is new or can be shared + for _, addr := range addIPAddr.Spec.Addresses { + if slices.Contains(o.Spec.Addresses, addr) { + // Check the Service is configured to share the IP + sharedIpKey, ok := svc.Annotations[metallbAnnotationSharedIP] + if !ok { + return false, fmt.Errorf("unable to configure IPAddressPool: requested ip %s already in use and no %s annotation found", addr, metallbAnnotationSharedIP) + } + + // Check the shared IP key matches the pool annotation + if _, ok := o.Annotations[sharedAnnotationKey(sharedIpKey)]; !ok { + return false, fmt.Errorf("unable to configure IPAddressPool: requested ip %s already in use and %s annotation does not match", addr, metallbAnnotationSharedIP) + } + + updateAnnotations = true + updateAddresses = true + break + } } } - if updateLabels || updateAddresses { + + if updateLabels || updateAddresses || updateAnnotations { // update pool patch := client.MergeFrom(o.DeepCopy()) if updateLabels { @@ -189,7 +217,19 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv addresses := append(o.Spec.Addresses, addIPAddr.Spec.Addresses...) slices.Sort(addresses) o.Spec.Addresses = slices.Compact(addresses) - o.Spec.Addresses = addresses + } + if updateAnnotations { + sharedIpKey := sharedAnnotationKey(svc.Annotations[metallbAnnotationSharedIP]) + if sharedSvcs, ok := o.Annotations[sharedIpKey]; !ok { + // Safer way to set annotations in case the annotation map itself is nil + o.SetAnnotations(map[string]string{sharedIpKey: sharedServiceName(svcNamespace, svcName)}) + } else { + sharedSvcs := strings.Split(sharedSvcs, ",") + sharedSvcs = append(sharedSvcs, sharedServiceName(svcNamespace, svcName)) + slices.Sort(sharedSvcs) + sharedSvcs = slices.Compact(sharedSvcs) + o.Annotations[sharedIpKey] = strings.Join(sharedSvcs, ",") + } } err := m.client.Patch(ctx, &o, patch) if err != nil { @@ -236,6 +276,57 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv return true, nil } +// RemoveFromAddressPool removes a service from a pool by name. If the matching pool is not found, do not change anything +func (m *CRDConfigurer) RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error { + if svcNamespace == "" || svcName == "" { + return nil + } + + olds, err := m.listIPAddressPools(ctx) + if err != nil { + return err + } + + // go through the pools and see if we have a match + pool := poolName(svcNamespace, svcName) + for _, o := range olds.Items { + if slices.ContainsFunc(maps.Keys(o.GetAnnotations()), func(s string) bool { + return strings.HasPrefix(s, svcAnnotationSharedPrefix) && containsSharedService(o.Annotations[s], svcNamespace, svcName) + }) { + // If there are more services sharing this pool, we only need to remove this service from the annotation + for k, v := range o.GetAnnotations() { + if strings.HasPrefix(k, svcAnnotationSharedPrefix) && containsSharedService(v, svcNamespace, svcName) { + // TODO: Update go and update to use a simple DeleteFunc: + // svcList := slices.DeleteFunc(strings.Split(v, ","), func(s string) bool { + // return s == sharedServiceName(svcNamespace, svcName) + // } + svcList := strings.Split(v, ",") + idx := slices.Index(svcList, sharedServiceName(svcNamespace, svcName)) + svcList = slices.Delete(svcList, idx, idx+1) + if len(svcList) == 0 { + // No other shared services with this key + return m.RemoveAddressPool(ctx, o.GetName()) + } else { + patch := client.MergeFrom(o.DeepCopy()) + delete(o.Labels, serviceLabelKey(svcName)) + o.Annotations[k] = strings.Join(svcList, ",") + if m.client.Patch(ctx, &o, patch); err != nil { + return fmt.Errorf("unable to update IPAddressPool %s: %w", o.GetName(), err) + } + // Other Services still use this IP + return ErrIPStillInUse + } + + } + } + } else if o.GetName() == pool { + // Not shared, so just delete the pool + return m.RemoveAddressPool(ctx, pool) + } + } + return nil +} + // RemoveAddressPool removes a pool by name. If the matching pool does not exist, do not change anything func (m *CRDConfigurer) RemoveAddressPool(ctx context.Context, pool string) error { if pool == "" { diff --git a/metal/loadbalancers/metallb/cr_config.go b/metal/loadbalancers/metallb/cr_config.go index 107101bf..9942ccda 100644 --- a/metal/loadbalancers/metallb/cr_config.go +++ b/metal/loadbalancers/metallb/cr_config.go @@ -4,9 +4,11 @@ import ( "fmt" "regexp" "sort" + "strings" "time" metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" + "golang.org/x/exp/slices" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -24,6 +26,19 @@ func serviceLabelValue(svcNamespace string) string { return svcLabelValuePrefix + svcNamespace } +func sharedAnnotationKey(sharedKey string) string { + return svcAnnotationSharedPrefix + sharedKey +} + +func sharedServiceName(svcNamespace, svcName string) string { + return fmt.Sprintf("%s.%s", svcNamespace, svcName) +} + +func containsSharedService(poolAnnotationValue, svcNamespace, svcName string) bool { + svcList := strings.Split(poolAnnotationValue, ",") + return slices.Contains(svcList, sharedServiceName(svcNamespace, svcName)) +} + func convertToIPAddr(addr AddressPool, namespace, svcNamespace, svcName string) metallbv1beta1.IPAddressPool { ip := metallbv1beta1.IPAddressPool{ Spec: metallbv1beta1.IPAddressPoolSpec{ diff --git a/metal/loadbalancers/metallb/metallb.go b/metal/loadbalancers/metallb/metallb.go index c6a4e704..6d19ede3 100644 --- a/metal/loadbalancers/metallb/metallb.go +++ b/metal/loadbalancers/metallb/metallb.go @@ -2,6 +2,7 @@ package metallb import ( "context" + "errors" "fmt" "net/url" "strconv" @@ -9,6 +10,7 @@ import ( "github.com/equinix/cloud-provider-equinix-metal/metal/loadbalancers" metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" @@ -43,6 +45,9 @@ type Configurer interface { // Returns if anything changed AddAddressPool(ctx context.Context, add *AddressPool, svcNamespace, svcName string) (bool, error) + // RemoveFromAddressPool remove service from a pool by name. If the matching pool if not found, do not change anything + RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error + // RemoveAddressPool remove a pool by name. If the matching pool does not exist, do not change anything RemoveAddressPool(ctx context.Context, pool string) error @@ -61,6 +66,8 @@ type LB struct { var ( _ loadbalancers.LB = (*LB)(nil) crdConfiguration = false + + ErrIPStillInUse = errors.New("ip address still in use") ) // func NewLB(k8sclient kubernetes.Interface, k8sApiextensionsClientset *k8sapiextensionsclient.Clientset, config string) *LB { @@ -101,6 +108,7 @@ func NewLB(k8sclient kubernetes.Interface, config string, featureFlags url.Value if crdConfiguration { scheme := runtime.NewScheme() _ = metallbv1beta1.AddToScheme(scheme) + _ = corev1.AddToScheme(scheme) cl, err := client.New(clientconfig.GetConfigOrDie(), client.Options{Scheme: scheme}) if err != nil { panic(err) @@ -261,9 +269,9 @@ func updateIP(ctx context.Context, config Configurer, addr, svcNamespace, svcNam return fmt.Errorf("error removing IP: %w", err) } } else { - if err := config.RemoveAddressPool(ctx, name); err != nil { - klog.V(2).Infof("error removing IPAddressPool: %v", err) - return fmt.Errorf("error removing IPAddressPool: %w", err) + if err := config.RemoveFromAddressPool(ctx, svcNamespace, svcName); err != nil { + klog.V(2).Infof("error removing from IPAddressPool: %v", err) + return fmt.Errorf("error removing from IPAddressPool: %w", err) } } }