diff --git a/controllers/datamovementmanager_controller.go b/controllers/datamovementmanager_controller.go index 610dc95d..462288dd 100644 --- a/controllers/datamovementmanager_controller.go +++ b/controllers/datamovementmanager_controller.go @@ -27,6 +27,7 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "time" "golang.org/x/crypto/ssh" @@ -144,6 +145,13 @@ func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl. return errorHandler(err, "create or update DaemonSet") } + res, err := r.removeLustreFileSystemsFinalizersIfNecessary(ctx, manager) + if err != nil { + return errorHandler(err, "remove LustreFileSystems finalizers") + } else if res != nil { + return *res, nil + } + manager.Status.Ready = true if err := r.Status().Update(ctx, manager); err != nil { return ctrl.Result{}, err @@ -329,7 +337,6 @@ func (r *DataMovementManagerReconciler) updateLustreFileSystemsIfNecessary(ctx c for _, lustre := range filesystems.Items { _, found := lustre.Spec.Namespaces[manager.Namespace] if !found { - if lustre.Spec.Namespaces == nil { lustre.Spec.Namespaces = make(map[string]lusv1beta1.LustreFileSystemNamespaceSpec) } @@ -337,18 +344,85 @@ func (r *DataMovementManagerReconciler) updateLustreFileSystemsIfNecessary(ctx c lustre.Spec.Namespaces[manager.Namespace] = lusv1beta1.LustreFileSystemNamespaceSpec{ Modes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany}, } + } - if err := r.Update(ctx, &lustre); err != nil { - return err - } + // Add the dm finalizer to keep this resource from being deleted until dm is no longer using it + if lustre.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(&lustre, finalizer) { + controllerutil.AddFinalizer(&lustre, finalizer) + } - log.Info("Updated LustreFileSystem", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace) + if err := r.Update(ctx, &lustre); err != nil { + return err } + log.Info("Updated LustreFileSystem", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace) } return nil } +func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) (*ctrl.Result, error) { + log := log.FromContext(ctx) + + filesystems := &lusv1beta1.LustreFileSystemList{} + if err := r.List(ctx, filesystems); err != nil && !meta.IsNoMatchError(err) { + return nil, fmt.Errorf("list lustre file systems failed: %w", err) + } + + // Get the DS to compare the list of volumes + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonsetName, + Namespace: manager.Namespace, + }, + } + if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil { + return nil, err + } + + finalizersToRemove := []lusv1beta1.LustreFileSystem{} + for _, lustre := range filesystems.Items { + // This lustre is in the process of deleting, verify it is not in the list of DS volumes + if !lustre.DeletionTimestamp.IsZero() { + finalizersToRemove = append(finalizersToRemove, lustre) + for _, vol := range ds.Spec.Template.Spec.Volumes { + if lustre.Name == vol.Name { + log.Info("Requeue: wait for daemonset to drop lustrefilesystem volume", "lustrefilesystem", lustre) + return &ctrl.Result{RequeueAfter: 1 * time.Second}, nil + } + } + } + } + + if len(finalizersToRemove) == 0 { + return nil, nil + } + + // Now the DS does not have any lustre filesystems that are being deleted, verify that the + // daemonset's pods (i.e. dm worker pods) have restarted + d := ds.Status.DesiredNumberScheduled + if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d { + // wait for pods to restart + log.Info("Requeue: wait for daemonset to restart pods after dropping lustrefilesystem volume", + "desired", d, "updated", ds.Status.UpdatedNumberScheduled, "ready", ds.Status.NumberReady) + return &ctrl.Result{RequeueAfter: 1 * time.Second}, nil + } + + // Now the finalizers can be removed + for _, lustre := range finalizersToRemove { + if controllerutil.ContainsFinalizer(&lustre, finalizer) { + controllerutil.RemoveFinalizer(&lustre, finalizer) + if err := r.Update(ctx, &lustre); err != nil { + return nil, err + } + log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(), + "namespace", manager.Namespace, + "daemonset", ds) + } + } + + return nil, nil +} + func (r *DataMovementManagerReconciler) createOrUpdateDaemonSetIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) error { log := log.FromContext(ctx) @@ -437,24 +511,30 @@ func setupLustreVolumes(ctx context.Context, manager *dmv1alpha1.DataMovementMan // Setup Volumes / Volume Mounts for accessing global Lustre file systems - volumes := make([]corev1.Volume, len(fileSystems)) - volumeMounts := make([]corev1.VolumeMount, len(fileSystems)) - for idx, fs := range fileSystems { + volumes := []corev1.Volume{} + volumeMounts := []corev1.VolumeMount{} + for _, fs := range fileSystems { + + if !fs.DeletionTimestamp.IsZero() { + log.Info("Global lustre volume is in the process of being deleted", "name", client.ObjectKeyFromObject(&fs).String()) + continue + } + log.Info("Adding global lustre volume", "name", client.ObjectKeyFromObject(&fs).String()) - volumes[idx] = corev1.Volume{ + volumes = append(volumes, corev1.Volume{ Name: fs.Name, VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: fs.PersistentVolumeClaimName(manager.Namespace, corev1.ReadWriteMany), }, }, - } + }) - volumeMounts[idx] = corev1.VolumeMount{ + volumeMounts = append(volumeMounts, corev1.VolumeMount{ Name: fs.Name, MountPath: fs.Spec.MountRoot, - } + }) } // Add the NNF Mounts diff --git a/controllers/datamovementmanager_controller_test.go b/controllers/datamovementmanager_controller_test.go index bef105b1..b634f2b7 100644 --- a/controllers/datamovementmanager_controller_test.go +++ b/controllers/datamovementmanager_controller_test.go @@ -20,8 +20,6 @@ package controllers import ( - "context" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" @@ -31,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1" dmv1alpha1 "github.com/NearNodeFlash/nnf-dm/api/v1alpha1" @@ -38,6 +37,7 @@ import ( var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { + var lustre *lusv1beta1.LustreFileSystem ns := &corev1.Namespace{} deployment := &appsv1.Deployment{} mgr := &dmv1alpha1.DataMovementManager{} @@ -51,7 +51,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { }, } - err := k8sClient.Create(context.TODO(), ns) + err := k8sClient.Create(ctx, ns) Expect(err == nil || errors.IsAlreadyExists(err)).Should(BeTrue()) // Create a dummy deployment of the data movement manager @@ -81,7 +81,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { }, } - err = k8sClient.Create(context.TODO(), deployment) + err = k8sClient.Create(ctx, deployment) Expect(err == nil || errors.IsAlreadyExists(err)).Should(BeTrue()) }) @@ -112,33 +112,40 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { }) JustBeforeEach(func() { - Expect(k8sClient.Create(context.TODO(), mgr)).Should(Succeed()) + Expect(k8sClient.Create(ctx, mgr)).Should(Succeed()) }) JustAfterEach(func() { - Expect(k8sClient.Delete(context.TODO(), mgr)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, mgr)).Should(Succeed()) Eventually(func() error { - return k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr) + return k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr) }).ShouldNot(Succeed()) + + if lustre != nil { + k8sClient.Delete(ctx, lustre) // may or may not be already deleted + Eventually(func() error { + return k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre) + }).ShouldNot(Succeed()) + } }) It("Bootstraps all managed components", func() { Eventually(func(g Gomega) bool { - g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) return mgr.Status.Ready }).Should(BeTrue()) }) - It("Adds global lustre volumes", func() { + It("Adds and removes global lustre volumes", func() { By("Wait for the manager to go ready") Eventually(func(g Gomega) bool { - g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed()) return mgr.Status.Ready }).Should(BeTrue()) By("Creating a Global Lustre File System") - lustre := &lusv1beta1.LustreFileSystem{ + lustre = &lusv1beta1.LustreFileSystem{ ObjectMeta: metav1.ObjectMeta{ Name: "global", Namespace: corev1.NamespaceDefault, @@ -150,17 +157,21 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { }, } - Expect(k8sClient.Create(context.TODO(), lustre)).Should(Succeed()) + Expect(k8sClient.Create(ctx, lustre)).Should(Succeed()) By("Expect namespace is added to lustre volume") - Eventually(func(g Gomega) lusv1beta1.LustreFileSystemNamespaceSpec { - g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed()) return lustre.Spec.Namespaces[mgr.Namespace] }).ShouldNot(BeNil()) - By("The Volume appears in the daemon set") + By("Expect finalizer is added to lustre volume") + Eventually(func(g Gomega) []string { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed()) + return lustre.Finalizers + }).Should(ContainElement(finalizer)) + By("The Volume appears in the daemon set") daemonset := &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: daemonsetName, @@ -169,8 +180,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { } Eventually(func(g Gomega) error { - g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed()) - + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed()) g.Expect(daemonset.Spec.Template.Spec.Volumes).Should( ContainElement( MatchFields(IgnoreExtras, Fields{ @@ -178,7 +188,6 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { }), ), ) - g.Expect(daemonset.Spec.Template.Spec.Containers[0].VolumeMounts).Should( ContainElement( MatchFields(IgnoreExtras, Fields{ @@ -187,9 +196,68 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() { }), ), ) - return nil }).Should(Succeed()) + By("Deleting Global Lustre File System") + Expect(k8sClient.Delete(ctx, lustre)).To(Succeed()) + + By("Expect Global Lustre File system/finalizer to stay around until daemonset restarts pods without the volume") + Eventually(func(g Gomega) error { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed()) + + for _, v := range daemonset.Spec.Template.Spec.Volumes { + desired := daemonset.Status.DesiredNumberScheduled + updated := daemonset.Status.UpdatedNumberScheduled + ready := daemonset.Status.NumberReady + expectedGen := daemonset.ObjectMeta.Generation + gen := daemonset.Status.ObservedGeneration + + // Fake the updates to the daemonset since the daemonset controller doesn't run + fakeDSUpdates(daemonset, g) + + if v.Name == lustre.Name { + // If the volume still exists, then so should lustre + finalizer + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed()) + g.Expect(controllerutil.ContainsFinalizer(lustre, finalizer)).To(BeTrue()) + + } else if gen != expectedGen && updated != desired && ready != desired { + // If pods have not restarted, lustre + finalizer should still be there + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed()) + g.Expect(controllerutil.ContainsFinalizer(lustre, finalizer)).To(BeTrue()) + } else { + // Once volume is gone and pods have restarted, lustre should be gone (and return error) + return k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre) + } + } + + return nil + }, "15s").ShouldNot(Succeed()) }) }) + +// Envtest does not run the built-in controllers (e.g. daemonset controller). This function fakes +// that out. Walk the counters up by one each time so we can exercise the controller watching these +// through a few iterations. +func fakeDSUpdates(ds *appsv1.DaemonSet, g Gomega) { + const desired = 5 // number of nnf nodes + + ds.Status.DesiredNumberScheduled = desired + + ds.Status.ObservedGeneration++ + if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation { + ds.Status.ObservedGeneration = ds.ObjectMeta.Generation + } + + ds.Status.UpdatedNumberScheduled++ + if ds.Status.UpdatedNumberScheduled > desired { + ds.Status.UpdatedNumberScheduled = desired + } + + ds.Status.NumberReady++ + if ds.Status.NumberReady > desired { + ds.Status.NumberReady = desired + } + + g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed()) +}