diff --git a/cmd/lbManager.go b/cmd/lbManager.go index 153e64f..5cef756 100644 --- a/cmd/lbManager.go +++ b/cmd/lbManager.go @@ -124,6 +124,7 @@ type services will also have their ingress IPs set to this service IP. } svcHandler := ServiceEventHandler{ + lock: make(chan struct{}, 1), Host: hostClient.CoreV1(), Guest: guestClient.CoreV1(), Ctx: ctx, @@ -157,6 +158,7 @@ type services will also have their ingress IPs set to this service IP. if releaseConfig.LoadBalancerIngress.Enabled { ingHandler := IngressEventHandler{ + lock: make(chan struct{}, 1), HostSvc: hostClient.CoreV1(), GuestSvc: guestClient.CoreV1(), HostIng: hostClient.NetworkingV1(), @@ -259,6 +261,7 @@ func objString(i interface{}) string { } type ServiceEventHandler struct { + lock chan struct{} Host corev1client.ServicesGetter Guest corev1client.ServicesGetter Ctx context.Context @@ -266,6 +269,12 @@ type ServiceEventHandler struct { NodePorts map[int32]corev1.ServicePort } +func (s *ServiceEventHandler) WithLock(f func()) { + s.lock <- struct{}{} + defer func() { <-s.lock }() + f() +} + func (s *ServiceEventHandler) SetPorts() { s.Target.Labels = releaseConfig.LoadBalancerLabels s.Target.Annotations = releaseConfig.LoadBalancerServiceAnnotations @@ -358,36 +367,38 @@ func (s *ServiceEventHandler) SetLBIngress(svc *corev1.Service) error { } func (s *ServiceEventHandler) OnAdd(obj interface{}) { - svc, ok := obj.(*corev1.Service) - if !ok { - klog.Warning("Got unexpected guest resource %s", objString(obj)) - return - } + s.WithLock(func() { + svc, ok := obj.(*corev1.Service) + if !ok { + klog.Warning("Got unexpected guest resource %s", objString(obj)) + return + } - if !(svc.Spec.Type == corev1.ServiceTypeNodePort || svc.Spec.Type == corev1.ServiceTypeLoadBalancer) { - klog.V(1).Info("Ignoring %s guest service", svc.Spec.Type) - return - } - klog.Info("Got new guest service", objString(obj)) + if !(svc.Spec.Type == corev1.ServiceTypeNodePort || svc.Spec.Type == corev1.ServiceTypeLoadBalancer) { + klog.V(1).Info("Ignoring %s guest service", svc.Spec.Type) + return + } + klog.Info("Got new guest service", objString(obj)) - s.AddPortsFor(svc) + s.AddPortsFor(svc) - err := s.CreateOrUpdateHostLB() - if err != nil { - klog.Error(err) - return - } + err := s.CreateOrUpdateHostLB() + if err != nil { + klog.Error(err) + return + } - if svc.Spec.Type != corev1.ServiceTypeLoadBalancer { - klog.V(1).Info("Not setting LB ingress IP for %s guest service", svc.Spec.Type) - return - } + if svc.Spec.Type != corev1.ServiceTypeLoadBalancer { + klog.V(1).Info("Not setting LB ingress IP for %s guest service", svc.Spec.Type) + return + } - err = s.SetLBIngress(svc) - if err != nil { - klog.Error(err) - return - } + err = s.SetLBIngress(svc) + if err != nil { + klog.Error(err) + return + } + }) } // PortName produces a predictable port name from a guest service. @@ -423,61 +434,65 @@ func ConvertPort(namespace, name string, port *corev1.ServicePort) corev1.Servic } func (s *ServiceEventHandler) OnUpdate(oldObj, newObj interface{}) { - oldSvc, ok := oldObj.(*corev1.Service) - if !ok { - klog.Warning("Got unexpected guest resource %s", objString(oldObj)) - return - } - newSvc, ok := newObj.(*corev1.Service) - if !ok { - klog.Warning("Got unexpected guest resource %s", objString(newObj)) - return - } + s.WithLock(func() { + oldSvc, ok := oldObj.(*corev1.Service) + if !ok { + klog.Warning("Got unexpected guest resource %s", objString(oldObj)) + return + } + newSvc, ok := newObj.(*corev1.Service) + if !ok { + klog.Warning("Got unexpected guest resource %s", objString(newObj)) + return + } - klog.Info("Got updated guest service ", objString(newObj)) + klog.Info("Got updated guest service ", objString(newObj)) - if oldSvc.Spec.Type == corev1.ServiceTypeNodePort || oldSvc.Spec.Type == corev1.ServiceTypeLoadBalancer { - s.RemovePortsFor(oldSvc) - } + if oldSvc.Spec.Type == corev1.ServiceTypeNodePort || oldSvc.Spec.Type == corev1.ServiceTypeLoadBalancer { + s.RemovePortsFor(oldSvc) + } - if !(newSvc.Spec.Type == corev1.ServiceTypeNodePort || newSvc.Spec.Type == corev1.ServiceTypeLoadBalancer) { - return - } + if !(newSvc.Spec.Type == corev1.ServiceTypeNodePort || newSvc.Spec.Type == corev1.ServiceTypeLoadBalancer) { + return + } - s.AddPortsFor(newSvc) + s.AddPortsFor(newSvc) - err := s.CreateOrUpdateHostLB() - if err != nil { - klog.Error(err) - return - } + err := s.CreateOrUpdateHostLB() + if err != nil { + klog.Error(err) + return + } - if newSvc.Spec.Type != corev1.ServiceTypeLoadBalancer { - klog.Info("Ignoring/removing ports for %s service", newSvc.Spec.Type) - return - } + if newSvc.Spec.Type != corev1.ServiceTypeLoadBalancer { + klog.Info("Ignoring/removing ports for %s service", newSvc.Spec.Type) + return + } - err = s.SetLBIngress(newSvc) - if err != nil { - klog.Error(err) - } + err = s.SetLBIngress(newSvc) + if err != nil { + klog.Error(err) + } + }) } func (s *ServiceEventHandler) OnDelete(obj interface{}) { - svc, ok := obj.(*corev1.Service) - if !ok { - klog.Warning("Got unexpected guest resource %s", objString(obj)) - return - } - klog.Info("Got deleted guest service %s", objString(obj)) + s.WithLock(func() { + svc, ok := obj.(*corev1.Service) + if !ok { + klog.Warning("Got unexpected guest resource %s", objString(obj)) + return + } + klog.Info("Got deleted guest service %s", objString(obj)) - s.RemovePortsFor(svc) + s.RemovePortsFor(svc) - err := s.CreateOrUpdateHostLB() - if err != nil { - klog.Error(err) - return - } + err := s.CreateOrUpdateHostLB() + if err != nil { + klog.Error(err) + return + } + }) } // Same fields as netv1.HTTPIngressPath, but fixes them not hashing to the same even when the fields are the same due to pointers @@ -505,6 +520,7 @@ func ToHashable(path *netv1.HTTPIngressPath) HashableHTTPIngressPath { } type IngressEventHandler struct { + lock chan struct{} HostSvc corev1client.ServicesGetter GuestSvc corev1client.ServicesGetter HostIng netv1client.IngressesGetter @@ -514,6 +530,12 @@ type IngressEventHandler struct { Paths map[string]map[string]map[HashableHTTPIngressPath]netv1.HTTPIngressPath } +func (s *IngressEventHandler) WithLock(f func()) { + s.lock <- struct{}{} + defer func() { <-s.lock }() + f() +} + func GetClassName(ing *netv1.Ingress) (string, bool) { if ing.Spec.IngressClassName != nil && *ing.Spec.IngressClassName != "" { return *ing.Spec.IngressClassName, true @@ -686,119 +708,126 @@ func (i *IngressEventHandler) CreateOrUpdateHostIngress(guestClass string, mappe } *target = *ing return nil -} - -func (i *IngressEventHandler) OnAdd(obj interface{}) { - ing, ok := obj.(*netv1.Ingress) - if !ok { - klog.Warning("Got unexpected guest resource %s", objString(obj)) - return - } - klog.Info("Got new guest ingress", objString(obj)) - className, ok := GetClassName(ing) - if !ok { - klog.Info("Ignoring class-less guest ingress") - return - } - mappedClass, ok := releaseConfig.LoadBalancerIngress.ClassMappings[className] - if !ok { - klog.Infof("Ignoring guest ingress with unmapped class %s", className) - return - } - i.AddPathsFor(className, ing) - - target := i.Targets[className] - err := i.CreateOrUpdateHostIngress(className, &mappedClass, &target) - if err != nil { - klog.Error(err) - return - } - i.Targets[className] = target } -func (i *IngressEventHandler) OnUpdate(oldObj, newObj interface{}) { - oldIng, ok := oldObj.(*netv1.Ingress) - if !ok { - klog.Warning("Got unexpected guest resource %s", objString(oldObj)) - return - } - newIng, ok := newObj.(*netv1.Ingress) - if !ok { - klog.Warning("Got unexpected guest resource %s", objString(newObj)) - return - } - klog.Info("Got updated guest ingress ", objString(newObj)) - - oldClassName, hadClass := GetClassName(oldIng) - var oldMappedClass cfg.LoadBalancerIngressClassMapping - var hadKnownClass bool - if !hadClass { - klog.Info("Not removing paths for previously class-less guest ingress") - } else if oldMappedClass, hadKnownClass = releaseConfig.LoadBalancerIngress.ClassMappings[oldClassName]; !ok { - klog.Infof("Not removing paths for guest ingress with previously unknown class %s", oldClassName) - } else { - i.RemovePathsFor(oldClassName, oldIng) - } +func (i *IngressEventHandler) OnAdd(obj interface{}) { + i.WithLock(func() { + ing, ok := obj.(*netv1.Ingress) + if !ok { + klog.Warning("Got unexpected guest resource %s", objString(obj)) + return + } + klog.Info("Got new guest ingress", objString(obj)) - newClassName, hasClass := GetClassName(newIng) - var newMappedClass cfg.LoadBalancerIngressClassMapping - var hasKnownClass bool - if !hasClass { - klog.Info("Not removing paths for previously class-less guest ingress") - } else if newMappedClass, hadKnownClass = releaseConfig.LoadBalancerIngress.ClassMappings[newClassName]; !ok { - klog.Infof("Not removing paths for guest ingress with unknown class %s", newClassName) - } else { - i.AddPathsFor(newClassName, newIng) - } + className, ok := GetClassName(ing) + if !ok { + klog.Info("Ignoring class-less guest ingress") + return + } + mappedClass, ok := releaseConfig.LoadBalancerIngress.ClassMappings[className] + if !ok { + klog.Infof("Ignoring guest ingress with unmapped class %s", className) + return + } + i.AddPathsFor(className, ing) - if hadKnownClass { - target := i.Targets[oldClassName] - err := i.CreateOrUpdateHostIngress(oldClassName, &oldMappedClass, &target) + target := i.Targets[className] + err := i.CreateOrUpdateHostIngress(className, &mappedClass, &target) if err != nil { klog.Error(err) return } - i.Targets[oldClassName] = target - } + i.Targets[className] = target + }) +} - if (!hadKnownClass && hasKnownClass) || (hadKnownClass && hasKnownClass && oldClassName != newClassName) { - target := i.Targets[newClassName] - err := i.CreateOrUpdateHostIngress(newClassName, &newMappedClass, &target) - if err != nil { - klog.Error(err) +func (i *IngressEventHandler) OnUpdate(oldObj, newObj interface{}) { + i.WithLock(func() { + oldIng, ok := oldObj.(*netv1.Ingress) + if !ok { + klog.Warning("Got unexpected guest resource %s", objString(oldObj)) return } - i.Targets[newClassName] = target + newIng, ok := newObj.(*netv1.Ingress) + if !ok { + klog.Warning("Got unexpected guest resource %s", objString(newObj)) + return + } + klog.Info("Got updated guest ingress ", objString(newObj)) + + oldClassName, hadClass := GetClassName(oldIng) + var oldMappedClass cfg.LoadBalancerIngressClassMapping + var hadKnownClass bool + if !hadClass { + klog.Info("Not removing paths for previously class-less guest ingress") + } else if oldMappedClass, hadKnownClass = releaseConfig.LoadBalancerIngress.ClassMappings[oldClassName]; !ok { + klog.Infof("Not removing paths for guest ingress with previously unknown class %s", oldClassName) + } else { + i.RemovePathsFor(oldClassName, oldIng) + } - } + newClassName, hasClass := GetClassName(newIng) + var newMappedClass cfg.LoadBalancerIngressClassMapping + var hasKnownClass bool + if !hasClass { + klog.Info("Not removing paths for previously class-less guest ingress") + } else if newMappedClass, hadKnownClass = releaseConfig.LoadBalancerIngress.ClassMappings[newClassName]; !ok { + klog.Infof("Not removing paths for guest ingress with unknown class %s", newClassName) + } else { + i.AddPathsFor(newClassName, newIng) + } + + if hadKnownClass { + target := i.Targets[oldClassName] + err := i.CreateOrUpdateHostIngress(oldClassName, &oldMappedClass, &target) + if err != nil { + klog.Error(err) + return + } + i.Targets[oldClassName] = target + } + + if (!hadKnownClass && hasKnownClass) || (hadKnownClass && hasKnownClass && oldClassName != newClassName) { + target := i.Targets[newClassName] + err := i.CreateOrUpdateHostIngress(newClassName, &newMappedClass, &target) + if err != nil { + klog.Error(err) + return + } + i.Targets[newClassName] = target + + } + }) } func (i *IngressEventHandler) OnDelete(obj interface{}) { - ing, ok := obj.(*netv1.Ingress) - if !ok { - klog.Warning("Got unexpected guest resource %s", objString(obj)) - return - } - klog.Info("Got deleted guest ingress %s", objString(obj)) + i.WithLock(func() { + ing, ok := obj.(*netv1.Ingress) + if !ok { + klog.Warning("Got unexpected guest resource %s", objString(obj)) + return + } + klog.Info("Got deleted guest ingress %s", objString(obj)) - className, ok := GetClassName(ing) - if !ok { - klog.Info("Ignoring class-less guest ingress") - return - } - mappedClass, ok := releaseConfig.LoadBalancerIngress.ClassMappings[className] - if !ok { - klog.Infof("Ingoring guest ingress with unknown class %s", className) - } + className, ok := GetClassName(ing) + if !ok { + klog.Info("Ignoring class-less guest ingress") + return + } + mappedClass, ok := releaseConfig.LoadBalancerIngress.ClassMappings[className] + if !ok { + klog.Infof("Ingoring guest ingress with unknown class %s", className) + } - i.RemovePathsFor(className, ing) + i.RemovePathsFor(className, ing) - target := i.Targets[className] - err := i.CreateOrUpdateHostIngress(className, &mappedClass, &target) - if err != nil { - klog.Error(err) - return - } - i.Targets[className] = target + target := i.Targets[className] + err := i.CreateOrUpdateHostIngress(className, &mappedClass, &target) + if err != nil { + klog.Error(err) + return + } + i.Targets[className] = target + }) }