From 71d98ec7684fa6576c378066b0299722953722f3 Mon Sep 17 00:00:00 2001 From: Blake Devcich Date: Fri, 3 Nov 2023 14:18:36 -0500 Subject: [PATCH] Gate NnfDataMovementManager.Status.Ready flag on daemonset ready * Don't signal ready until the nnf-dm-worker daemonset is update to date and ready * If there's an error, set ready to false * Added isDaemonSetReady() function to check on the status of the daemonset. This needs to happen as part of the main reconciler loop and when removing the finalizers. * Use the status updater to set the Ready flag. Signed-off-by: Blake Devcich --- .vscode/launch.json | 2 +- api/v1alpha1/datamovementmanager_types.go | 6 ++ .../datamovementmanager_controller.go | 60 ++++++++++--- .../datamovementmanager_controller_test.go | 88 +++++++++++++------ 4 files changed, 117 insertions(+), 39 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 0c385442..2552aa34 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -49,7 +49,7 @@ "type": "go", "request": "launch", "mode": "test", - "program": "${workspaceFolder}/controllers", + "program": "${workspaceFolder}/internal/controller", "args": [ "-ginkgo.v", "-ginkgo.progress", diff --git a/api/v1alpha1/datamovementmanager_types.go b/api/v1alpha1/datamovementmanager_types.go index 6a8f400c..02931b29 100644 --- a/api/v1alpha1/datamovementmanager_types.go +++ b/api/v1alpha1/datamovementmanager_types.go @@ -22,6 +22,8 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/DataWorkflowServices/dws/utils/updater" ) const ( @@ -89,3 +91,7 @@ type DataMovementManagerList struct { func init() { SchemeBuilder.Register(&DataMovementManager{}, &DataMovementManagerList{}) } + +func (m *DataMovementManager) GetStatus() updater.Status[*DataMovementManagerStatus] { + return &m.Status +} diff --git a/internal/controller/datamovementmanager_controller.go b/internal/controller/datamovementmanager_controller.go index 714ac073..7a97c33e 100644 --- a/internal/controller/datamovementmanager_controller.go +++ b/internal/controller/datamovementmanager_controller.go @@ -33,6 +33,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -45,6 +46,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/DataWorkflowServices/dws/utils/updater" lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1" dmv1alpha1 "github.com/NearNodeFlash/nnf-dm/api/v1alpha1" "github.com/NearNodeFlash/nnf-dm/internal/controller/metrics" @@ -106,7 +108,7 @@ type DataMovementManagerReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. -func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { log := log.FromContext(ctx) metrics.NnfDmDataMovementManagerReconcilesTotal.Inc() @@ -116,10 +118,14 @@ func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl. return ctrl.Result{}, client.IgnoreNotFound(err) } + statusUpdater := updater.NewStatusUpdater[*dmv1alpha1.DataMovementManagerStatus](manager) + defer func() { err = statusUpdater.CloseWithStatusUpdate(ctx, r.Client.Status(), err) }() + errorHandler := func(err error, msg string) (ctrl.Result, error) { if errors.IsConflict(err) { return ctrl.Result{Requeue: true}, nil } + manager.Status.Ready = false log.Error(err, msg+" failed") return ctrl.Result{}, err } @@ -148,11 +154,15 @@ func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl. return errorHandler(err, "remove LustreFileSystems finalizers") } - manager.Status.Ready = true - if err := r.Status().Update(ctx, manager); err != nil { - return ctrl.Result{}, err + if ready, err := r.isDaemonSetReady(ctx, manager); err != nil { + return errorHandler(err, "check if daemonset is ready") + } else if !ready { + manager.Status.Ready = false + log.Info("daemonset not ready") + return ctrl.Result{}, nil } + manager.Status.Ready = true return ctrl.Result{}, nil } @@ -372,6 +382,9 @@ func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNeces }, } if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil { + if apierrors.IsNotFound(err) { + return nil + } return err } @@ -395,12 +408,11 @@ func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNeces // Now the DS does not have any lustre filesystems that are being deleted, verify that the // daemonset's pods (i.e. dm worker pods) have restarted - d := ds.Status.DesiredNumberScheduled - if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d { - // wait for pods to restart - log.Info("Daemonset still has pods to restart after dropping lustrefilesystem volume", - "desired", d, "updated", ds.Status.UpdatedNumberScheduled, "ready", ds.Status.NumberReady) + if ready, err := r.isDaemonSetReady(ctx, manager); !ready { + log.Info("Daemonset still has pods to restart after dropping lustrefilesystem volume") return nil + } else if err != nil { + return err } // Now the finalizers can be removed @@ -410,15 +422,37 @@ func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNeces if err := r.Update(ctx, &lustre); err != nil { return err } - log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(), - "namespace", manager.Namespace, - "daemonset", ds) + log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace) } } return nil } +func (r *DataMovementManagerReconciler) isDaemonSetReady(ctx context.Context, manager *dmv1alpha1.DataMovementManager) (bool, error) { + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonsetName, + Namespace: manager.Namespace, + }, + } + if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil { + if apierrors.IsNotFound(err) { + return false, nil + + } + return false, err + } + + d := ds.Status.DesiredNumberScheduled + log.FromContext(ctx).Info("isDaemonSetReady", "generation", ds.ObjectMeta.Generation, "status", ds.Status) + if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d { + return false, nil + } + + return true, nil +} + func (r *DataMovementManagerReconciler) createOrUpdateDaemonSetIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) error { log := log.FromContext(ctx) @@ -473,7 +507,7 @@ func (r *DataMovementManagerReconciler) createOrUpdateDaemonSetIfNecessary(ctx c if result == controllerutil.OperationResultCreated { log.Info("Created DaemonSet", "object", client.ObjectKeyFromObject(ds).String()) } else if result == controllerutil.OperationResultUpdated { - log.Info("Updated DaemonSet", "object", client.ObjectKeyFromObject(ds).String()) + log.Info("Updated DaemonSet", "object", client.ObjectKeyFromObject(ds).String(), "generation", ds.ObjectMeta.Generation, "status", ds.Status) } return nil diff --git a/internal/controller/datamovementmanager_controller_test.go b/internal/controller/datamovementmanager_controller_test.go index 6a1d161f..6136da88 100644 --- a/internal/controller/datamovementmanager_controller_test.go +++ b/internal/controller/datamovementmanager_controller_test.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -38,6 +39,8 @@ import ( var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { var lustre *lusv1beta1.LustreFileSystem + var daemonset *appsv1.DaemonSet + ns := &corev1.Namespace{} deployment := &appsv1.Deployment{} mgr := &dmv1alpha1.DataMovementManager{} @@ -109,6 +112,13 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { }, }, } + + daemonset = &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonsetName, + Namespace: mgr.Namespace, + }, + } }) JustBeforeEach(func() { @@ -131,15 +141,17 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { It("Bootstraps all managed components", func() { Eventually(func(g Gomega) bool { + g.Expect(fakeDSUpdates(daemonset)).To(Succeed()) g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) return mgr.Status.Ready - }).Should(BeTrue()) + }, "5s").Should(BeTrue()) }) It("Adds and removes global lustre volumes", func() { By("Wait for the manager to go ready") Eventually(func(g Gomega) bool { + g.Expect(fakeDSUpdates(daemonset)).To(Succeed()) g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) return mgr.Status.Ready }).Should(BeTrue()) @@ -159,6 +171,12 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { Expect(k8sClient.Create(ctx, lustre)).Should(Succeed()) + By("Status should not be ready") + Eventually(func(g Gomega) bool { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) + return mgr.Status.Ready + }).Should(BeFalse()) + By("Expect namespace is added to lustre volume") Eventually(func(g Gomega) lusv1beta1.LustreFileSystemNamespaceSpec { g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed()) @@ -172,13 +190,6 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { }).Should(ContainElement(finalizer)) By("The Volume appears in the daemon set") - daemonset := &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: daemonsetName, - Namespace: mgr.Namespace, - }, - } - Eventually(func(g Gomega) error { g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed()) g.Expect(daemonset.Spec.Template.Spec.Volumes).Should( @@ -199,9 +210,22 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { return nil }).Should(Succeed()) + By("Status should be ready after daemonset is up to date") + Eventually(func(g Gomega) bool { + g.Expect(fakeDSUpdates(daemonset)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) + return mgr.Status.Ready + }).Should(BeTrue()) + By("Deleting Global Lustre File System") Expect(k8sClient.Delete(ctx, lustre)).To(Succeed()) + By("Status should be ready since daemonset was updated") + Eventually(func(g Gomega) bool { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) + return mgr.Status.Ready + }).Should(BeFalse()) + By("Expect Global Lustre File system/finalizer to stay around until daemonset restarts pods without the volume") Eventually(func(g Gomega) error { g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed()) @@ -214,7 +238,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { gen := daemonset.Status.ObservedGeneration // Fake the updates to the daemonset since the daemonset controller doesn't run - fakeDSUpdates(daemonset, g) + g.Expect(fakeDSUpdates(daemonset)).To(Succeed()) if v.Name == lustre.Name { // If the volume still exists, then so should lustre + finalizer @@ -233,31 +257,45 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { return nil }, "15s").ShouldNot(Succeed()) + + By("Status should be ready since daemonset is up to date from previous step") + Eventually(func(g Gomega) bool { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) + return mgr.Status.Ready + }).Should(BeTrue()) }) }) // Envtest does not run the built-in controllers (e.g. daemonset controller). This function fakes // that out. Walk the counters up by one each time so we can exercise the controller watching these // through a few iterations. -func fakeDSUpdates(ds *appsv1.DaemonSet, g Gomega) { - const desired = 5 // number of nnf nodes +func fakeDSUpdates(ds *appsv1.DaemonSet) error { + const desired = 2 // number of nnf nodes - ds.Status.DesiredNumberScheduled = desired + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil { + return err + } + ds.Status.DesiredNumberScheduled = desired - ds.Status.ObservedGeneration++ - if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation { - ds.Status.ObservedGeneration = ds.ObjectMeta.Generation - } + ds.Status.ObservedGeneration++ + if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation { + ds.Status.ObservedGeneration = ds.ObjectMeta.Generation + } - ds.Status.UpdatedNumberScheduled++ - if ds.Status.UpdatedNumberScheduled > desired { - ds.Status.UpdatedNumberScheduled = desired - } + ds.Status.UpdatedNumberScheduled++ + if ds.Status.UpdatedNumberScheduled > desired { + ds.Status.UpdatedNumberScheduled = desired + } + + ds.Status.NumberReady++ + if ds.Status.NumberReady > desired { + ds.Status.NumberReady = desired + } + return k8sClient.Status().Update(ctx, ds) + }) - ds.Status.NumberReady++ - if ds.Status.NumberReady > desired { - ds.Status.NumberReady = desired - } + return err - g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed()) + // g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed()) }