From 8fcd9860b68ecc187d0430a85141e1990cec13ef Mon Sep 17 00:00:00 2001 From: Tanmay Jain <103629776+tanmayja@users.noreply.github.com> Date: Tue, 3 Oct 2023 00:44:21 +0530 Subject: [PATCH] [KO-259] Changing ports for headless service. (#250) * Changing ports for headless service when moving tls to non-ls and vice versa. * changing ports for lb service and moving headless service creation out of rack-level reconciliation. --- controllers/rack.go | 35 +--- controllers/reconciler.go | 13 +- controllers/service.go | 332 ++++++++++++++++---------------- controllers/statefulset.go | 14 ++ test/batch_restart_pods_test.go | 3 +- test/cluster_helper.go | 50 +++++ test/cluster_test.go | 12 +- test/utils.go | 9 +- 8 files changed, 273 insertions(+), 195 deletions(-) diff --git a/controllers/rack.go b/controllers/rack.go index a7422c241..8b396d215 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -59,7 +59,7 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ignorablePodNames, ) - // handle failed racks + // Handle failed racks for idx := range rackStateList { var podList []*corev1.Pod @@ -232,11 +232,6 @@ func (r *SingleClusterReconciler) createEmptyRack(rackState *RackState) ( // NoOp if already exist r.Log.Info("AerospikeCluster", "Spec", r.aeroCluster.Spec) - if err := r.createSTSHeadlessSvc(); err != nil { - r.Log.Error(err, "Failed to create headless service") - return nil, reconcileError(err) - } - // Bad config should not come here. It should be validated in validation hook cmName := getNamespacedNameForSTSConfigMap(r.aeroCluster, rackState.Rack.ID) if err := r.buildSTSConfigMap(cmName, rackState.Rack); err != nil { @@ -584,17 +579,8 @@ func (r *SingleClusterReconciler) scaleUpRack(found *appsv1.StatefulSet, rackSta } // Create pod service for the scaled up pod when node network is used in network policy - if podServiceNeeded(r.aeroCluster.Spec.PodSpec.MultiPodPerHost, &r.aeroCluster.Spec.AerospikeNetworkPolicy) { - // Create services for each pod - for _, podName := range newPodNames { - if err = r.createPodService( - podName, r.aeroCluster.Namespace, - ); err != nil { - if !errors.IsAlreadyExists(err) { - return found, reconcileError(err) - } - } - } + if err = r.createOrUpdatePodServiceIfNeeded(newPodNames); err != nil { + return nil, reconcileError(err) } // update replicas here to avoid new replicas count comparison while cleaning up dangling pods of rack @@ -701,12 +687,11 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r "rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, ) - if err = r.createOrUpdatePodServiceIfNeeded(podsBatch); err != nil { + podNames := getPodNames(podsBatch) + if err = r.createOrUpdatePodServiceIfNeeded(podNames); err != nil { return nil, reconcileError(err) } - podNames := getPodNames(podsBatch) - r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeNormal, "PodImageUpdate", "[rack-%d] Updating Containers on Pods %v", rackState.Rack.ID, podNames, @@ -833,7 +818,7 @@ func (r *SingleClusterReconciler) scaleDownRack( // If scale down leads to unavailable or dead partition then we should scale up the cluster, // This can be left to the user but if we would do it here on our own then we can reuse // objects like pvc and service. These objects would have been removed if scaleup is left for the user. - // In case of rolling restart, no pod cleanup happens, therefor rolling config back is left to the user. + // In case of rolling restart, no pod cleanup happens, therefore rolling config back is left to the user. if err = r.validateSCClusterState(policy, ignorablePods); err != nil { // reset cluster size newSize := *found.Spec.Replicas + 1 @@ -968,7 +953,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, var podsBatchList [][]*corev1.Pod if len(failedPods) != 0 { - // creating a single batch of all failed pods in a rack, irrespective of batch size + // Creating a single batch of all failed pods in a rack, irrespective of batch size r.Log.Info("Skipping batchSize for failed pods") podsBatchList = make([][]*corev1.Pod, 1) @@ -991,12 +976,12 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, "rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, ) - if err = r.createOrUpdatePodServiceIfNeeded(podsBatch); err != nil { + podNames := getPodNames(podsBatch) + if err = r.createOrUpdatePodServiceIfNeeded(podNames); err != nil { return nil, reconcileError(err) } - res := r.rollingRestartPods(rackState, podsBatch, ignorablePods, restartTypeMap) - if !res.isSuccess { + if res := r.rollingRestartPods(rackState, podsBatch, ignorablePods, restartTypeMap); !res.isSuccess { return found, res } diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 493bf8d71..f207bbd37 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -89,6 +89,17 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { } } + if err := r.createOrUpdateSTSHeadlessSvc(); err != nil { + r.Log.Error(err, "Failed to create headless service") + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeWarning, "ServiceCreateFailed", + "Failed to create Service(Headless) %s/%s", + r.aeroCluster.Namespace, r.aeroCluster.Name, + ) + + return reconcile.Result{}, err + } + // Reconcile all racks if res := r.reconcileRacks(); !res.isSuccess { if res.err != nil { @@ -102,7 +113,7 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return res.getResult() } - if err := r.createSTSLoadBalancerSvc(); err != nil { + if err := r.createOrUpdateSTSLoadBalancerSvc(); err != nil { r.Log.Error(err, "Failed to create LoadBalancer service") r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeWarning, "ServiceCreateFailed", diff --git a/controllers/service.go b/controllers/service.go index f96b143cb..764a675ec 100644 --- a/controllers/service.go +++ b/controllers/service.go @@ -3,11 +3,11 @@ package controllers import ( "context" "fmt" + "reflect" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" @@ -21,10 +21,8 @@ func getSTSHeadLessSvcName(aeroCluster *asdbv1.AerospikeCluster) string { return aeroCluster.Name } -func (r *SingleClusterReconciler) createSTSHeadlessSvc() error { - r.Log.Info("Create headless service for statefulSet") - - ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) +func (r *SingleClusterReconciler) createOrUpdateSTSHeadlessSvc() error { + r.Log.Info("Create or Update headless service for statefulSet") serviceName := getSTSHeadLessSvcName(r.aeroCluster) service := &corev1.Service{} @@ -35,63 +33,59 @@ func (r *SingleClusterReconciler) createSTSHeadlessSvc() error { }, service, ) if err != nil { - if errors.IsNotFound(err) { - service = &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - // Headless service has the same name as AerospikeCluster - Name: serviceName, - Namespace: r.aeroCluster.Namespace, - // deprecation in 1.10, supported until at least 1.13, breaks peer-finder/kube-dns if not used - Annotations: map[string]string{ - "service.alpha.kubernetes.io/tolerate-unready-endpoints": "true", - }, - Labels: ls, - }, - Spec: corev1.ServiceSpec{ - // deprecates service.alpha.kubernetes.io/tolerate-unready-endpoints as of 1. - // 10? see: kubernetes/kubernetes#49239 Fixed in 1.11 as of #63742 - PublishNotReadyAddresses: true, - ClusterIP: "None", - Selector: ls, - }, - } - - r.appendServicePorts(service) + if !errors.IsNotFound(err) { + return err + } - // Set AerospikeCluster instance as the owner and controller - err = controllerutil.SetControllerReference( - r.aeroCluster, service, r.Scheme, - ) - if err != nil { - return err - } + ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) + service = &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + // Headless service has the same name as AerospikeCluster + Name: serviceName, + Namespace: r.aeroCluster.Namespace, + // deprecation in 1.10, supported until at least 1.13, breaks peer-finder/kube-dns if not used + Annotations: map[string]string{ + "service.alpha.kubernetes.io/tolerate-unready-endpoints": "true", + }, + Labels: ls, + }, + Spec: corev1.ServiceSpec{ + // deprecates service.alpha.kubernetes.io/tolerate-unready-endpoints as of 1. + // 10? see: kubernetes/kubernetes#49239 Fixed in 1.11 as of #63742 + PublishNotReadyAddresses: true, + ClusterIP: "None", + Selector: ls, + }, + } - if err = r.Client.Create( - context.TODO(), service, createOption, - ); err != nil { - return fmt.Errorf( - "failed to create headless service for statefulset: %v", - err, - ) - } + service.Spec.Ports = r.getServicePorts() - r.Log.Info("Created new headless service") + // Set AerospikeCluster instance as the owner and controller + err = controllerutil.SetControllerReference( + r.aeroCluster, service, r.Scheme, + ) + if err != nil { + return err + } - return nil + if err = r.Client.Create( + context.TODO(), service, createOption, + ); err != nil { + return fmt.Errorf( + "failed to create headless service for statefulset: %v", + err, + ) } - return err - } + r.Log.Info("Created new headless service") - r.Log.Info( - "Service already exist for statefulSet. Using existing service", "name", - utils.NamespacedName(service.Namespace, service.Name), - ) + return nil + } - return nil + return r.updateServicePorts(service) } -func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { +func (r *SingleClusterReconciler) createOrUpdateSTSLoadBalancerSvc() error { loadBalancer := r.aeroCluster.Spec.SeedsFinderServices.LoadBalancer if loadBalancer == nil { r.Log.Info("LoadBalancer is not configured. Skipping...") @@ -100,6 +94,7 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { serviceName := r.aeroCluster.Name + "-lb" service := &corev1.Service{} + servicePort := r.getLBServicePort(loadBalancer) if err := r.Client.Get( context.TODO(), types.NamespacedName{ @@ -110,26 +105,6 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { r.Log.Info("Creating LoadBalancer service for cluster") ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) - var targetPort int32 - if loadBalancer.TargetPort >= 1024 { - // if target port is specified in CR. - targetPort = loadBalancer.TargetPort - } else if tlsName, tlsPort := asdbv1.GetServiceTLSNameAndPort( - r.aeroCluster.Spec.AerospikeConfig, - ); tlsName != "" && tlsPort != nil { - targetPort = int32(*tlsPort) - } else { - targetPort = int32(*asdbv1.GetServicePort(r.aeroCluster.Spec.AerospikeConfig)) - } - - var port int32 - if loadBalancer.Port >= 1024 { - // if port is specified in CR. - port = loadBalancer.Port - } else { - port = targetPort - } - service = &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, @@ -141,11 +116,7 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { Type: corev1.ServiceTypeLoadBalancer, Selector: ls, Ports: []corev1.ServicePort{ - { - Port: port, - Name: loadBalancer.PortName, - TargetPort: intstr.FromInt(int(targetPort)), - }, + servicePort, }, }, } @@ -182,58 +153,81 @@ func (r *SingleClusterReconciler) createSTSLoadBalancerSvc() error { return err } + if len(service.Spec.Ports) == 1 && service.Spec.Ports[0].Port == servicePort.Port && + service.Spec.Ports[0].TargetPort == servicePort.TargetPort { + return nil + } + + service.Spec.Ports = []corev1.ServicePort{ + servicePort, + } + if err := r.Client.Update( + context.TODO(), service, updateOption, + ); err != nil { + return fmt.Errorf( + "failed to update service %s: %v", serviceName, err, + ) + } + r.Log.Info( - "LoadBalancer Service already exist for cluster. Using existing service", + "LoadBalancer Service already exist for cluster. Updated existing service", "name", utils.NamespacedName(service.Namespace, service.Name), ) return nil } -func (r *SingleClusterReconciler) createPodService(pName, pNamespace string) error { +func (r *SingleClusterReconciler) createOrUpdatePodService(pName, pNamespace string) error { service := &corev1.Service{} - if err := r.Client.Get( - context.TODO(), - types.NamespacedName{Name: pName, Namespace: pNamespace}, service, - ); err == nil { - return errors.NewAlreadyExists(schema.GroupResource{Resource: "Service"}, service.Name) - } - - // NodePort will be allocated automatically - service = &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: pName, - Namespace: pNamespace, - }, - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeNodePort, - Selector: map[string]string{ - "statefulset.kubernetes.io/pod-name": pName, - }, - ExternalTrafficPolicy: "Local", - }, - } - - r.appendServicePorts(service) - // Set AerospikeCluster instance as the owner and controller. - // It is created before Pod, so Pod cannot be the owner - err := controllerutil.SetControllerReference( - r.aeroCluster, service, r.Scheme, + err := r.Client.Get( + context.TODO(), types.NamespacedName{ + Name: pName, Namespace: pNamespace, + }, service, ) if err != nil { - return err - } + if !errors.IsNotFound(err) { + return err + } - if err := r.Client.Create( - context.TODO(), service, createOption, - ); err != nil { - return fmt.Errorf( - "failed to create new service for pod %s: %v", pName, err, + // NodePort will be allocated automatically + service = &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pName, + Namespace: pNamespace, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Selector: map[string]string{ + "statefulset.kubernetes.io/pod-name": pName, + }, + ExternalTrafficPolicy: "Local", + }, + } + + service.Spec.Ports = r.getServicePorts() + + // Set AerospikeCluster instance as the owner and controller. + // It is created before Pod, so Pod cannot be the owner + err := controllerutil.SetControllerReference( + r.aeroCluster, service, r.Scheme, ) + if err != nil { + return err + } + + if err := r.Client.Create( + context.TODO(), service, createOption, + ); err != nil { + return fmt.Errorf( + "failed to create new service for pod %s: %v", pName, err, + ) + } + + return nil } - return nil + return r.updateServicePorts(service) } func (r *SingleClusterReconciler) deletePodService(pName, pNamespace string) error { @@ -260,38 +254,49 @@ func (r *SingleClusterReconciler) deletePodService(pName, pNamespace string) err return nil } -func (r *SingleClusterReconciler) updatePodServicePorts(pName, pNamespace string) error { - service := &corev1.Service{} - if err := r.Client.Get( - context.TODO(), - types.NamespacedName{Name: pName, Namespace: pNamespace}, service, - ); err != nil { - return err +func (r *SingleClusterReconciler) updateServicePorts(service *corev1.Service) error { + servicePorts := r.getServicePorts() + + servicePortsMap := make(map[string]int32) + for _, port := range servicePorts { + servicePortsMap[port.Name] = port.Port } - // Resetting ports here based on current spec config. - // kubernetes is able to set previously assigned nodePort value to the services if any - service.Spec.Ports = nil - r.appendServicePorts(service) + specPortsMap := make(map[string]int32) + for _, port := range service.Spec.Ports { + specPortsMap[port.Name] = port.Port + } + if reflect.DeepEqual(servicePortsMap, specPortsMap) { + return nil + } + + service.Spec.Ports = servicePorts if err := r.Client.Update( context.TODO(), service, updateOption, ); err != nil { return fmt.Errorf( - "failed to update service for pod %s: %v", pName, err, + "failed to update service %s: %v", service.Name, err, ) } + r.Log.Info( + "Service already exist. Updated existing service", + "name", utils.NamespacedName(service.Namespace, service.Name), + ) + return nil } -func (r *SingleClusterReconciler) appendServicePorts(service *corev1.Service) { +func (r *SingleClusterReconciler) getServicePorts() []corev1.ServicePort { + servicePorts := make([]corev1.ServicePort, 0) + if svcPort := asdbv1.GetServicePort( r.aeroCluster.Spec. AerospikeConfig, ); svcPort != nil { - service.Spec.Ports = append( - service.Spec.Ports, corev1.ServicePort{ + servicePorts = append( + servicePorts, corev1.ServicePort{ Name: asdbv1.ServicePortName, Port: int32(*svcPort), }, @@ -301,13 +306,43 @@ func (r *SingleClusterReconciler) appendServicePorts(service *corev1.Service) { if _, tlsPort := asdbv1.GetServiceTLSNameAndPort( r.aeroCluster.Spec.AerospikeConfig, ); tlsPort != nil { - service.Spec.Ports = append( - service.Spec.Ports, corev1.ServicePort{ + servicePorts = append( + servicePorts, corev1.ServicePort{ Name: asdbv1.ServiceTLSPortName, Port: int32(*tlsPort), }, ) } + + return servicePorts +} + +func (r *SingleClusterReconciler) getLBServicePort(loadBalancer *asdbv1.LoadBalancerSpec) corev1.ServicePort { + var targetPort int32 + if loadBalancer.TargetPort >= 1024 { + // if target port is specified in CR. + targetPort = loadBalancer.TargetPort + } else if tlsName, tlsPort := asdbv1.GetServiceTLSNameAndPort( + r.aeroCluster.Spec.AerospikeConfig, + ); tlsName != "" && tlsPort != nil { + targetPort = int32(*tlsPort) + } else { + targetPort = int32(*asdbv1.GetServicePort(r.aeroCluster.Spec.AerospikeConfig)) + } + + var port int32 + if loadBalancer.Port >= 1024 { + // If port is specified in CR. + port = loadBalancer.Port + } else { + port = targetPort + } + + return corev1.ServicePort{ + Port: port, + Name: loadBalancer.PortName, + TargetPort: intstr.FromInt(int(targetPort)), + } } func (r *SingleClusterReconciler) cleanupDanglingPodServices(rackState *RackState) error { @@ -344,24 +379,14 @@ func podServiceNeeded(multiPodPerHost bool, networkPolicy *asdbv1.AerospikeNetwo return networkSet.Len() > 2 } -func (r *SingleClusterReconciler) createOrUpdatePodServiceIfNeeded(pods []*corev1.Pod) error { - if (!podServiceNeeded(r.aeroCluster.Status.PodSpec.MultiPodPerHost, &r.aeroCluster.Status.AerospikeNetworkPolicy) || - isServiceTLSChanged(r.aeroCluster.Spec.AerospikeConfig, r.aeroCluster.Status.AerospikeConfig)) && - podServiceNeeded(r.aeroCluster.Spec.PodSpec.MultiPodPerHost, &r.aeroCluster.Spec.AerospikeNetworkPolicy) { +func (r *SingleClusterReconciler) createOrUpdatePodServiceIfNeeded(pods []string) error { + if podServiceNeeded(r.aeroCluster.Spec.PodSpec.MultiPodPerHost, &r.aeroCluster.Spec.AerospikeNetworkPolicy) { // Create services for all pods if network policy is changed and rely on nodePort service for idx := range pods { - if err := r.createPodService( - pods[idx].Name, r.aeroCluster.Namespace, + if err := r.createOrUpdatePodService( + pods[idx], r.aeroCluster.Namespace, ); err != nil { - if !errors.IsAlreadyExists(err) { - return err - } - - if err := r.updatePodServicePorts( - pods[idx].Name, r.aeroCluster.Namespace, - ); err != nil { - return err - } + return err } } } @@ -369,21 +394,6 @@ func (r *SingleClusterReconciler) createOrUpdatePodServiceIfNeeded(pods []*corev return nil } -func isServiceTLSChanged(specAeroConf, statusAeroConf *asdbv1.AerospikeConfigSpec) bool { - if statusAeroConf == nil { - // If aerospikeconfig status is nil, assuming network service config has been changed - return true - } - - specTLSName, _ := asdbv1.GetServiceTLSNameAndPort(specAeroConf) - statusTLSName, _ := asdbv1.GetServiceTLSNameAndPort(statusAeroConf) - - specSVCPort := asdbv1.GetServicePort(specAeroConf) - statusSVCPort := asdbv1.GetServicePort(statusAeroConf) - - return specTLSName != statusTLSName || specSVCPort != statusSVCPort -} - func (r *SingleClusterReconciler) getServiceTLSNameAndPortIfConfigured() (tlsName string, port *int) { tlsName, port = asdbv1.GetServiceTLSNameAndPort(r.aeroCluster.Spec.AerospikeConfig) if tlsName != "" && r.aeroCluster.Status.AerospikeConfig != nil { diff --git a/controllers/statefulset.go b/controllers/statefulset.go index 99ec70b9b..957e6adbb 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -549,6 +549,17 @@ func (r *SingleClusterReconciler) updateSTSStorage( sortContainerVolumeAttachments(st.Spec.Template.Spec.Containers) } +func (r *SingleClusterReconciler) updateSTSPorts( + st *appsv1.StatefulSet, +) { + ports := getSTSContainerPort( + r.aeroCluster.Spec.PodSpec.MultiPodPerHost, + r.aeroCluster.Spec.AerospikeConfig, + ) + + st.Spec.Template.Spec.Containers[0].Ports = ports +} + func sortContainerVolumeAttachments(containers []corev1.Container) { for idx := range containers { sort.Slice( @@ -572,6 +583,9 @@ func (r *SingleClusterReconciler) updateSTS( // Update settings from pod spec. r.updateSTSFromPodSpec(statefulSet, rackState) + // Updating ports when switching between tls and non-tls. + r.updateSTSPorts(statefulSet) + // Update the images for all containers from the spec. // Our Pod Spec does not contain image for the Aerospike Server // Container. diff --git a/test/batch_restart_pods_test.go b/test/batch_restart_pods_test.go index 241c4763d..5d53ef76e 100644 --- a/test/batch_restart_pods_test.go +++ b/test/batch_restart_pods_test.go @@ -434,9 +434,8 @@ func isBatchRestart(aeroCluster *asdbv1.AerospikeCluster) bool { // Operator should restart batch of pods which will make multiple pods unready for i := 0; i < 100; i++ { readyPods := getReadyPods(aeroCluster) - unreadyPods := int(aeroCluster.Spec.Size) - len(readyPods) - fmt.Printf("unreadyPods %d\n", unreadyPods) + unreadyPods := int(aeroCluster.Spec.Size) - len(readyPods) if unreadyPods > 1 { return true } diff --git a/test/cluster_helper.go b/test/cluster_helper.go index de006abf4..86bb93602 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" @@ -74,6 +75,11 @@ func rollingRestartClusterByEnablingTLS( return err } + err = validateServiceUpdate(k8sClient, ctx, clusterNamespacedName, []int32{serviceTLSPort, serviceNonTLSPort}) + if err != nil { + return err + } + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) if err != nil { return err @@ -112,6 +118,11 @@ func rollingRestartClusterByDisablingTLS( return err } + err = validateServiceUpdate(k8sClient, ctx, clusterNamespacedName, []int32{serviceTLSPort, serviceNonTLSPort}) + if err != nil { + return err + } + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) if err != nil { return err @@ -381,6 +392,45 @@ func rollingRestartClusterByRemovingNamespaceDynamicallyTest( return updateCluster(k8sClient, ctx, aeroCluster) } +func validateServiceUpdate(k8sClient client.Client, ctx goctx.Context, + clusterNamespacedName types.NamespacedName, ports []int32) error { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + + serviceNamespacesNames := make([]types.NamespacedName, 0) + for podName := range aeroCluster.Status.Pods { + serviceNamespacesNames = append(serviceNamespacesNames, + types.NamespacedName{Name: podName, Namespace: clusterNamespacedName.Namespace}) + } + + serviceNamespacesNames = append(serviceNamespacesNames, clusterNamespacedName) + + for _, serviceNamespacesName := range serviceNamespacesNames { + service := &corev1.Service{} + + err = k8sClient.Get(ctx, serviceNamespacesName, service) + if err != nil { + return err + } + + portSet := sets.NewInt32(ports...) + + for _, p := range service.Spec.Ports { + if portSet.Has(p.Port) { + portSet.Delete(p.Port) + } + } + + if portSet.Len() > 0 { + return fmt.Errorf("service %s port not configured correctly", serviceNamespacesName.Name) + } + } + + return nil +} + func validateAerospikeConfigServiceClusterUpdate( log logr.Logger, k8sClient client.Client, ctx goctx.Context, clusterNamespacedName types.NamespacedName, updatedKeys []string, diff --git a/test/cluster_test.go b/test/cluster_test.go index 32374a399..6b0a1293a 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -553,7 +553,7 @@ func UpdateTLSClusterTest(ctx goctx.Context) { delete(serviceNetwork, "tls-port") delete(serviceNetwork, "tls-name") delete(serviceNetwork, "tls-authenticate-client") - serviceNetwork["port"] = float64(3000) + serviceNetwork["port"] = float64(serviceNonTLSPort) network[asdbv1.ServicePortName] = serviceNetwork aeroCluster.Spec.AerospikeConfig.Value["network"] = network err = updateCluster(k8sClient, ctx, aeroCluster) @@ -709,6 +709,9 @@ func UpdateClusterTest(ctx goctx.Context) { ) Expect(err).ToNot(HaveOccurred()) + err = validateServiceUpdate(k8sClient, ctx, clusterNamespacedName, []int32{serviceTLSPort}) + Expect(err).ToNot(HaveOccurred()) + By("RollingRestart By changing tls to non-tls") err = rollingRestartClusterByDisablingTLS( @@ -716,6 +719,9 @@ func UpdateClusterTest(ctx goctx.Context) { ) Expect(err).ToNot(HaveOccurred()) + err = validateServiceUpdate(k8sClient, ctx, clusterNamespacedName, []int32{serviceNonTLSPort}) + Expect(err).ToNot(HaveOccurred()) + By("Upgrade/Downgrade") // TODO: How to check if it is checking cluster stability before killing node @@ -1316,7 +1322,7 @@ func negativeDeployClusterValidationTest( ) networkConf := map[string]interface{}{ "service": map[string]interface{}{ - "port": 3000, + "port": serviceNonTLSPort, "access-addresses": []string{""}, }, } @@ -1847,7 +1853,7 @@ func negativeUpdateClusterValidationTest( networkConf := map[string]interface{}{ "service": map[string]interface{}{ - "port": 3000, + "port": serviceNonTLSPort, "access-addresses": []string{""}, }, } diff --git a/test/utils.go b/test/utils.go index 612443669..a56af0bc4 100644 --- a/test/utils.go +++ b/test/utils.go @@ -53,6 +53,9 @@ const aerospikeNs string = "aerospike" const zoneKey = "topology.kubernetes.io/zone" const regionKey = "topology.kubernetes.io/region" +const serviceTLSPort = 4333 +const serviceNonTLSPort = 3000 + // list of all the namespaces used in test-suite var testNamespaces = []string{namespace, multiClusterNs1, multiClusterNs2, aerospikeNs} @@ -408,8 +411,8 @@ func getNetworkTLSConfig() map[string]interface{} { return map[string]interface{}{ "service": map[string]interface{}{ "tls-name": "aerospike-a-0.test-runner", - "tls-port": 4333, - "port": 3000, + "tls-port": serviceTLSPort, + "port": serviceNonTLSPort, }, "fabric": map[string]interface{}{ "tls-name": "aerospike-a-0.test-runner", @@ -436,7 +439,7 @@ func getNetworkTLSConfig() map[string]interface{} { func getNetworkConfig() map[string]interface{} { return map[string]interface{}{ "service": map[string]interface{}{ - "port": 3000, + "port": serviceNonTLSPort, }, "fabric": map[string]interface{}{ "port": 3001,