diff --git a/controllers/rack.go b/controllers/rack.go index 31b948554..967b26484 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -232,11 +232,6 @@ func (r *SingleClusterReconciler) createEmptyRack(rackState *RackState) ( // NoOp if already exist r.Log.Info("AerospikeCluster", "Spec", r.aeroCluster.Spec) - if err := r.createSTSHeadlessSvc(); err != nil { - r.Log.Error(err, "Failed to create headless service") - return nil, reconcileError(err) - } - // Bad config should not come here. It should be validated in validation hook cmName := getNamespacedNameForSTSConfigMap(r.aeroCluster, rackState.Rack.ID) if err := r.buildSTSConfigMap(cmName, rackState.Rack); err != nil { @@ -584,17 +579,8 @@ func (r *SingleClusterReconciler) scaleUpRack(found *appsv1.StatefulSet, rackSta } // Create pod service for the scaled up pod when node network is used in network policy - if podServiceNeeded(r.aeroCluster.Spec.PodSpec.MultiPodPerHost, &r.aeroCluster.Spec.AerospikeNetworkPolicy) { - // Create services for each pod - for _, podName := range newPodNames { - if err = r.createPodService( - podName, r.aeroCluster.Namespace, - ); err != nil { - if !errors.IsAlreadyExists(err) { - return found, reconcileError(err) - } - } - } + if err = r.createOrUpdatePodServiceIfNeeded(newPodNames); err != nil { + return nil, reconcileError(err) } // update replicas here to avoid new replicas count comparison while cleaning up dangling pods of rack @@ -646,12 +632,6 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r } } - err = r.updateSTSHeadlessSvc() - if err != nil { - r.Log.Error(err, "Failed to update headless service") - return nil, reconcileError(err) - } - // Update STS definition. The operation is idempotent, so it's ok to call // it without checking for a change in the spec. // @@ -707,12 +687,12 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r "rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, ) - if err = r.createOrUpdatePodServiceIfNeeded(podsBatch); err != nil { + podNames := getPodNames(podsBatch) + + if err = r.createOrUpdatePodServiceIfNeeded(podNames); err != nil { return nil, reconcileError(err) } - podNames := getPodNames(podsBatch) - r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeNormal, "PodImageUpdate", "[rack-%d] Updating Containers on Pods %v", rackState.Rack.ID, podNames, @@ -839,7 +819,7 @@ func (r *SingleClusterReconciler) scaleDownRack( // If scale down leads to unavailable or dead partition then we should scale up the cluster, // This can be left to the user but if we would do it here on our own then we can reuse // objects like pvc and service. These objects would have been removed if scaleup is left for the user. - // In case of rolling restart, no pod cleanup happens, therefor rolling config back is left to the user. + // In case of rolling restart, no pod cleanup happens, therefore rolling config back is left to the user. if err = r.validateSCClusterState(policy, ignorablePods); err != nil { // reset cluster size newSize := *found.Spec.Replicas + 1 @@ -945,12 +925,6 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, ) } - err = r.updateSTSHeadlessSvc() - if err != nil { - r.Log.Error(err, "Failed to update headless service") - return nil, reconcileError(err) - } - err = r.updateSTS(found, rackState) if err != nil { return found, reconcileError( @@ -1003,7 +977,9 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, "rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, ) - if err = r.createOrUpdatePodServiceIfNeeded(podsBatch); err != nil { + podNames := getPodNames(podsBatch) + + if err = r.createOrUpdatePodServiceIfNeeded(podNames); err != nil { return nil, reconcileError(err) } diff --git a/controllers/reconciler.go b/controllers/reconciler.go index f1f201e37..002c4f315 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -102,7 +102,18 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return res.getResult() } - if err := r.createSTSLoadBalancerSvc(); err != nil { + if err := r.createOrUpdateSTSHeadlessSvc(); err != nil { + r.Log.Error(err, "Failed to create headless service") + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeWarning, "ServiceCreateFailed", + "Failed to create Service(Headless) %s/%s", + r.aeroCluster.Namespace, r.aeroCluster.Name, + ) + + return reconcile.Result{}, err + } + + if err := r.createOrUpdateSTSLoadBalancerSvc(); err != nil { r.Log.Error(err, "Failed to create LoadBalancer service") r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeWarning, "ServiceCreateFailed", diff --git a/controllers/service.go b/controllers/service.go index 49ca71b79..a8f4b52c3 100644 --- a/controllers/service.go +++ b/controllers/service.go @@ -3,11 +3,11 @@ package controllers import ( "context" "fmt" + "reflect" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" @@ -21,88 +21,64 @@ func getSTSHeadLessSvcName(aeroCluster *asdbv1.AerospikeCluster) string { return aeroCluster.Name } -func (r *SingleClusterReconciler) createSTSHeadlessSvc() error { - r.Log.Info("Create headless service for statefulSet") - - ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) +func (r *SingleClusterReconciler) createOrUpdateSTSHeadlessSvc() error { + r.Log.Info("Create or Update headless service for statefulSet") serviceName := getSTSHeadLessSvcName(r.aeroCluster) - service := &corev1.Service{} - - err := r.Client.Get( - context.TODO(), types.NamespacedName{ - Name: serviceName, Namespace: r.aeroCluster.Namespace, - }, service, - ) - if err != nil { - if errors.IsNotFound(err) { - service = &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - // Headless service has the same name as AerospikeCluster - Name: serviceName, - Namespace: r.aeroCluster.Namespace, - // deprecation in 1.10, supported until at least 1.13, breaks peer-finder/kube-dns if not used - Annotations: map[string]string{ - "service.alpha.kubernetes.io/tolerate-unready-endpoints": "true", - }, - Labels: ls, - }, - Spec: corev1.ServiceSpec{ - // deprecates service.alpha.kubernetes.io/tolerate-unready-endpoints as of 1. - // 10? see: kubernetes/kubernetes#49239 Fixed in 1.11 as of #63742 - PublishNotReadyAddresses: true, - ClusterIP: "None", - Selector: ls, - }, - } - r.appendServicePorts(service) + err := r.updateServicePorts(serviceName, r.aeroCluster.Namespace) + if err == nil { + return nil + } - // Set AerospikeCluster instance as the owner and controller - err = controllerutil.SetControllerReference( - r.aeroCluster, service, r.Scheme, - ) - if err != nil { - return err - } + if errors.IsNotFound(err) { + ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + // Headless service has the same name as AerospikeCluster + Name: serviceName, + Namespace: r.aeroCluster.Namespace, + // deprecation in 1.10, supported until at least 1.13, breaks peer-finder/kube-dns if not used + Annotations: map[string]string{ + "service.alpha.kubernetes.io/tolerate-unready-endpoints": "true", + }, + Labels: ls, + }, + Spec: corev1.ServiceSpec{ + // deprecates service.alpha.kubernetes.io/tolerate-unready-endpoints as of 1. + // 10? see: kubernetes/kubernetes#49239 Fixed in 1.11 as of #63742 + PublishNotReadyAddresses: true, + ClusterIP: "None", + Selector: ls, + }, + } - if err = r.Client.Create( - context.TODO(), service, createOption, - ); err != nil { - return fmt.Errorf( - "failed to create headless service for statefulset: %v", - err, - ) - } + service.Spec.Ports = r.getServicePorts() - r.Log.Info("Created new headless service") + // Set AerospikeCluster instance as the owner and controller + err = controllerutil.SetControllerReference( + r.aeroCluster, service, r.Scheme, + ) + if err != nil { + return err + } - return nil + if err = r.Client.Create( + context.TODO(), service, createOption, + ); err != nil { + return fmt.Errorf( + "failed to create headless service for statefulset: %v", + err, + ) } - return err + r.Log.Info("Created new headless service") } - r.Log.Info( - "Service already exist for statefulSet. Using existing service", "name", - utils.NamespacedName(service.Namespace, service.Name), - ) - return nil } -func (r *SingleClusterReconciler) updateSTSHeadlessSvc() error { - if !isServiceTLSChanged(r.aeroCluster.Spec.AerospikeConfig, r.aeroCluster.Status.AerospikeConfig) { - r.Log.Info("No need to update headless service for statefulSet") - return nil - } - - r.Log.Info("Update headless service for statefulSet") - - return r.updateServicePorts(getSTSHeadLessSvcName(r.aeroCluster), r.aeroCluster.Namespace) -} - -func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { +func (r *SingleClusterReconciler) createOrUpdateSTSLoadBalancerSvc() error { loadBalancer := r.aeroCluster.Spec.SeedsFinderServices.LoadBalancer if loadBalancer == nil { r.Log.Info("LoadBalancer is not configured. Skipping...") @@ -111,6 +87,7 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { serviceName := r.aeroCluster.Name + "-lb" service := &corev1.Service{} + servicePort := r.getLBServicePort(loadBalancer) if err := r.Client.Get( context.TODO(), types.NamespacedName{ @@ -121,26 +98,6 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { r.Log.Info("Creating LoadBalancer service for cluster") ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) - var targetPort int32 - if loadBalancer.TargetPort >= 1024 { - // if target port is specified in CR. - targetPort = loadBalancer.TargetPort - } else if tlsName, tlsPort := asdbv1.GetServiceTLSNameAndPort( - r.aeroCluster.Spec.AerospikeConfig, - ); tlsName != "" && tlsPort != nil { - targetPort = int32(*tlsPort) - } else { - targetPort = int32(*asdbv1.GetServicePort(r.aeroCluster.Spec.AerospikeConfig)) - } - - var port int32 - if loadBalancer.Port >= 1024 { - // if port is specified in CR. - port = loadBalancer.Port - } else { - port = targetPort - } - service = &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, @@ -152,11 +109,7 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { Type: corev1.ServiceTypeLoadBalancer, Selector: ls, Ports: []corev1.ServicePort{ - { - Port: port, - Name: loadBalancer.PortName, - TargetPort: intstr.FromInt(int(targetPort)), - }, + servicePort, }, }, } @@ -193,6 +146,23 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { return err } + if len(service.Spec.Ports) == 1 && service.Spec.Ports[0].Port == servicePort.Port && + service.Spec.Ports[0].TargetPort == servicePort.TargetPort && + service.Spec.Ports[0].Name == servicePort.Name { + return nil + } + + service.Spec.Ports = []corev1.ServicePort{ + servicePort, + } + if err := r.Client.Update( + context.TODO(), service, updateOption, + ); err != nil { + return fmt.Errorf( + "failed to update service %s: %v", serviceName, err, + ) + } + r.Log.Info( "LoadBalancer Service already exist for cluster. Using existing service", "name", utils.NamespacedName(service.Namespace, service.Name), @@ -201,47 +171,46 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { return nil } -func (r *SingleClusterReconciler) createPodService(pName, pNamespace string) error { - service := &corev1.Service{} - if err := r.Client.Get( - context.TODO(), - types.NamespacedName{Name: pName, Namespace: pNamespace}, service, - ); err == nil { - return errors.NewAlreadyExists(schema.GroupResource{Resource: "Service"}, service.Name) +func (r *SingleClusterReconciler) createOrUpdatePodService(pName, pNamespace string) error { + err := r.updateServicePorts(pName, pNamespace) + if err == nil { + return nil } - // NodePort will be allocated automatically - service = &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: pName, - Namespace: pNamespace, - }, - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeNodePort, - Selector: map[string]string{ - "statefulset.kubernetes.io/pod-name": pName, + if errors.IsNotFound(err) { + // NodePort will be allocated automatically + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pName, + Namespace: pNamespace, }, - ExternalTrafficPolicy: "Local", - }, - } + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Selector: map[string]string{ + "statefulset.kubernetes.io/pod-name": pName, + }, + ExternalTrafficPolicy: "Local", + }, + } - r.appendServicePorts(service) + service.Spec.Ports = r.getServicePorts() - // Set AerospikeCluster instance as the owner and controller. - // It is created before Pod, so Pod cannot be the owner - err := controllerutil.SetControllerReference( - r.aeroCluster, service, r.Scheme, - ) - if err != nil { - return err - } - - if err := r.Client.Create( - context.TODO(), service, createOption, - ); err != nil { - return fmt.Errorf( - "failed to create new service for pod %s: %v", pName, err, + // Set AerospikeCluster instance as the owner and controller. + // It is created before Pod, so Pod cannot be the owner + err := controllerutil.SetControllerReference( + r.aeroCluster, service, r.Scheme, ) + if err != nil { + return err + } + + if err := r.Client.Create( + context.TODO(), service, createOption, + ); err != nil { + return fmt.Errorf( + "failed to create new service for pod %s: %v", pName, err, + ) + } } return nil @@ -280,11 +249,23 @@ func (r *SingleClusterReconciler) updateServicePorts(sName, sNamespace string) e return err } - // Resetting ports here based on current spec config. - // kubernetes is able to set previously assigned nodePort value to the services if any - service.Spec.Ports = nil - r.appendServicePorts(service) + servicePorts := r.getServicePorts() + + servicePortsMap := make(map[string]int32) + for _, port := range servicePorts { + servicePortsMap[port.Name] = port.Port + } + specPortsMap := make(map[string]int32) + for _, port := range service.Spec.Ports { + specPortsMap[port.Name] = port.Port + } + + if reflect.DeepEqual(servicePortsMap, specPortsMap) { + return nil + } + + service.Spec.Ports = servicePorts if err := r.Client.Update( context.TODO(), service, updateOption, ); err != nil { @@ -296,13 +277,15 @@ func (r *SingleClusterReconciler) updateServicePorts(sName, sNamespace string) e return nil } -func (r *SingleClusterReconciler) appendServicePorts(service *corev1.Service) { +func (r *SingleClusterReconciler) getServicePorts() []corev1.ServicePort { + servicePorts := make([]corev1.ServicePort, 0) + if svcPort := asdbv1.GetServicePort( r.aeroCluster.Spec. AerospikeConfig, ); svcPort != nil { - service.Spec.Ports = append( - service.Spec.Ports, corev1.ServicePort{ + servicePorts = append( + servicePorts, corev1.ServicePort{ Name: asdbv1.ServicePortName, Port: int32(*svcPort), }, @@ -312,13 +295,43 @@ func (r *SingleClusterReconciler) appendServicePorts(service *corev1.Service) { if _, tlsPort := asdbv1.GetServiceTLSNameAndPort( r.aeroCluster.Spec.AerospikeConfig, ); tlsPort != nil { - service.Spec.Ports = append( - service.Spec.Ports, corev1.ServicePort{ + servicePorts = append( + servicePorts, corev1.ServicePort{ Name: asdbv1.ServiceTLSPortName, Port: int32(*tlsPort), }, ) } + + return servicePorts +} + +func (r *SingleClusterReconciler) getLBServicePort(loadBalancer *asdbv1.LoadBalancerSpec) corev1.ServicePort { + var targetPort int32 + if loadBalancer.TargetPort >= 1024 { + // if target port is specified in CR. + targetPort = loadBalancer.TargetPort + } else if tlsName, tlsPort := asdbv1.GetServiceTLSNameAndPort( + r.aeroCluster.Spec.AerospikeConfig, + ); tlsName != "" && tlsPort != nil { + targetPort = int32(*tlsPort) + } else { + targetPort = int32(*asdbv1.GetServicePort(r.aeroCluster.Spec.AerospikeConfig)) + } + + var port int32 + if loadBalancer.Port >= 1024 { + // If port is specified in CR. + port = loadBalancer.Port + } else { + port = targetPort + } + + return corev1.ServicePort{ + Port: port, + Name: loadBalancer.PortName, + TargetPort: intstr.FromInt(int(targetPort)), + } } func (r *SingleClusterReconciler) cleanupDanglingPodServices(rackState *RackState) error { @@ -355,24 +368,14 @@ func podServiceNeeded(multiPodPerHost bool, networkPolicy *asdbv1.AerospikeNetwo return networkSet.Len() > 2 } -func (r *SingleClusterReconciler) createOrUpdatePodServiceIfNeeded(pods []*corev1.Pod) error { - if (!podServiceNeeded(r.aeroCluster.Status.PodSpec.MultiPodPerHost, &r.aeroCluster.Status.AerospikeNetworkPolicy) || - isServiceTLSChanged(r.aeroCluster.Spec.AerospikeConfig, r.aeroCluster.Status.AerospikeConfig)) && - podServiceNeeded(r.aeroCluster.Spec.PodSpec.MultiPodPerHost, &r.aeroCluster.Spec.AerospikeNetworkPolicy) { +func (r *SingleClusterReconciler) createOrUpdatePodServiceIfNeeded(pods []string) error { + if podServiceNeeded(r.aeroCluster.Spec.PodSpec.MultiPodPerHost, &r.aeroCluster.Spec.AerospikeNetworkPolicy) { // Create services for all pods if network policy is changed and rely on nodePort service for idx := range pods { - if err := r.createPodService( - pods[idx].Name, r.aeroCluster.Namespace, + if err := r.createOrUpdatePodService( + pods[idx], r.aeroCluster.Namespace, ); err != nil { - if !errors.IsAlreadyExists(err) { - return err - } - - if err := r.updateServicePorts( - pods[idx].Name, r.aeroCluster.Namespace, - ); err != nil { - return err - } + return err } } } @@ -380,21 +383,6 @@ func (r *SingleClusterReconciler) createOrUpdatePodServiceIfNeeded(pods []*corev return nil } -func isServiceTLSChanged(specAeroConf, statusAeroConf *asdbv1.AerospikeConfigSpec) bool { - if statusAeroConf == nil { - // If aerospikeConfig status is nil, assuming network service config has been changed - return true - } - - specTLSName, _ := asdbv1.GetServiceTLSNameAndPort(specAeroConf) - statusTLSName, _ := asdbv1.GetServiceTLSNameAndPort(statusAeroConf) - - specSVCPort := asdbv1.GetServicePort(specAeroConf) - statusSVCPort := asdbv1.GetServicePort(statusAeroConf) - - return specTLSName != statusTLSName || *specSVCPort != *statusSVCPort -} - func (r *SingleClusterReconciler) getServiceTLSNameAndPortIfConfigured() (tlsName string, port *int) { tlsName, port = asdbv1.GetServiceTLSNameAndPort(r.aeroCluster.Spec.AerospikeConfig) if tlsName != "" && r.aeroCluster.Status.AerospikeConfig != nil {