Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmayja committed Sep 21, 2023
1 parent 9f4640b commit 84ad372
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 45 deletions.
7 changes: 2 additions & 5 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,6 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r
)

podNames := getPodNames(podsBatch)

if err = r.createOrUpdatePodServiceIfNeeded(podNames); err != nil {
return nil, reconcileError(err)
}
Expand Down Expand Up @@ -954,7 +953,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
var podsBatchList [][]*corev1.Pod

if len(failedPods) != 0 {
// creating a single batch of all failed pods in a rack, irrespective of batch size
// Creating a single batch of all failed pods in a rack, irrespective of batch size
r.Log.Info("Skipping batchSize for failed pods")

podsBatchList = make([][]*corev1.Pod, 1)
Expand All @@ -978,13 +977,11 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
)

podNames := getPodNames(podsBatch)

if err = r.createOrUpdatePodServiceIfNeeded(podNames); err != nil {
return nil, reconcileError(err)
}

res := r.rollingRestartPods(rackState, podsBatch, ignorablePods, restartTypeMap)
if !res.isSuccess {
if res := r.rollingRestartPods(rackState, podsBatch, ignorablePods, restartTypeMap); !res.isSuccess {
return found, res
}

Expand Down
22 changes: 11 additions & 11 deletions controllers/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) {
}
}

if err := r.createOrUpdateSTSHeadlessSvc(); err != nil {
r.Log.Error(err, "Failed to create headless service")
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeWarning, "ServiceCreateFailed",
"Failed to create Service(Headless) %s/%s",
r.aeroCluster.Namespace, r.aeroCluster.Name,
)

return reconcile.Result{}, err
}

// Reconcile all racks
if res := r.reconcileRacks(); !res.isSuccess {
if res.err != nil {
Expand All @@ -102,17 +113,6 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) {
return res.getResult()
}

if err := r.createOrUpdateSTSHeadlessSvc(); err != nil {
r.Log.Error(err, "Failed to create headless service")
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeWarning, "ServiceCreateFailed",
"Failed to create Service(Headless) %s/%s",
r.aeroCluster.Namespace, r.aeroCluster.Name,
)

return reconcile.Result{}, err
}

if err := r.createOrUpdateSTSLoadBalancerSvc(); err != nil {
r.Log.Error(err, "Failed to create LoadBalancer service")
r.Recorder.Eventf(
Expand Down
60 changes: 33 additions & 27 deletions controllers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ func (r *SingleClusterReconciler) createOrUpdateSTSHeadlessSvc() error {
r.Log.Info("Create or Update headless service for statefulSet")

serviceName := getSTSHeadLessSvcName(r.aeroCluster)
service := &corev1.Service{}

err := r.updateServicePorts(serviceName, r.aeroCluster.Namespace)
if err == nil {
return nil
}
err := r.Client.Get(
context.TODO(), types.NamespacedName{
Name: serviceName, Namespace: r.aeroCluster.Namespace,
}, service,
)
if err != nil {
if !errors.IsNotFound(err) {
return err
}

if errors.IsNotFound(err) {
ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name)
service := &corev1.Service{
service = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
// Headless service has the same name as AerospikeCluster
Name: serviceName,
Expand Down Expand Up @@ -73,9 +78,11 @@ func (r *SingleClusterReconciler) createOrUpdateSTSHeadlessSvc() error {
}

r.Log.Info("Created new headless service")

return nil
}

return nil
return r.updateServicePorts(service)
}

func (r *SingleClusterReconciler) createOrUpdateSTSLoadBalancerSvc() error {
Expand Down Expand Up @@ -147,8 +154,7 @@ func (r *SingleClusterReconciler) createOrUpdateSTSLoadBalancerSvc() error {
}

if len(service.Spec.Ports) == 1 && service.Spec.Ports[0].Port == servicePort.Port &&
service.Spec.Ports[0].TargetPort == servicePort.TargetPort &&
service.Spec.Ports[0].Name == servicePort.Name {
service.Spec.Ports[0].TargetPort == servicePort.TargetPort {
return nil
}

Expand All @@ -164,22 +170,28 @@ func (r *SingleClusterReconciler) createOrUpdateSTSLoadBalancerSvc() error {
}

r.Log.Info(
"LoadBalancer Service already exist for cluster. Using existing service",
"LoadBalancer Service already exist for cluster. Updated existing service",
"name", utils.NamespacedName(service.Namespace, service.Name),
)

return nil
}

func (r *SingleClusterReconciler) createOrUpdatePodService(pName, pNamespace string) error {
err := r.updateServicePorts(pName, pNamespace)
if err == nil {
return nil
}
service := &corev1.Service{}

err := r.Client.Get(
context.TODO(), types.NamespacedName{
Name: pName, Namespace: pNamespace,
}, service,
)
if err != nil {
if !errors.IsNotFound(err) {
return err
}

if errors.IsNotFound(err) {
// NodePort will be allocated automatically
service := &corev1.Service{
service = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: pName,
Namespace: pNamespace,
Expand Down Expand Up @@ -211,9 +223,11 @@ func (r *SingleClusterReconciler) createOrUpdatePodService(pName, pNamespace str
"failed to create new service for pod %s: %v", pName, err,
)
}

return nil
}

return nil
return r.updateServicePorts(service)
}

func (r *SingleClusterReconciler) deletePodService(pName, pNamespace string) error {
Expand All @@ -240,15 +254,7 @@ func (r *SingleClusterReconciler) deletePodService(pName, pNamespace string) err
return nil
}

func (r *SingleClusterReconciler) updateServicePorts(sName, sNamespace string) error {
service := &corev1.Service{}
if err := r.Client.Get(
context.TODO(),
types.NamespacedName{Name: sName, Namespace: sNamespace}, service,
); err != nil {
return err
}

func (r *SingleClusterReconciler) updateServicePorts(service *corev1.Service) error {
servicePorts := r.getServicePorts()

servicePortsMap := make(map[string]int32)
Expand All @@ -270,7 +276,7 @@ func (r *SingleClusterReconciler) updateServicePorts(sName, sNamespace string) e
context.TODO(), service, updateOption,
); err != nil {
return fmt.Errorf(
"failed to update service %s: %v", sName, err,
"failed to update service %s: %v", service.Name, err,
)
}

Expand Down
3 changes: 1 addition & 2 deletions test/batch_restart_pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,8 @@ func isBatchRestart(aeroCluster *asdbv1.AerospikeCluster) bool {
// Operator should restart batch of pods which will make multiple pods unready
for i := 0; i < 100; i++ {
readyPods := getReadyPods(aeroCluster)
unreadyPods := int(aeroCluster.Spec.Size) - len(readyPods)
fmt.Printf("unreadyPods %d\n", unreadyPods)

unreadyPods := int(aeroCluster.Spec.Size) - len(readyPods)
if unreadyPods > 1 {
return true
}
Expand Down

0 comments on commit 84ad372

Please sign in to comment.