Skip to content

Commit

Permalink
Add finalizers to LustreFileSystems (#135)
Browse files Browse the repository at this point in the history
LustreFileSystems can be removed before Data Movement worker pods have
unmounted them. This change adds a finalizer to the LustreFileSystems to
indicate that DM worker pods are using it. Once the LustreFileSystem has
been marked for deletion, we then ensure that the worker pods are no
longer using that volume. Then, the finalizer can be removed to indicate
that nothing is using the LustreFileSystem.

Signed-off-by: Blake Devcich <[email protected]>
  • Loading branch information
bdevcich authored Nov 3, 2023
1 parent 66e6f08 commit 188bba1
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 29 deletions.
96 changes: 86 additions & 10 deletions internal/controller/datamovementmanager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.
return errorHandler(err, "create or update DaemonSet")
}

if err := r.removeLustreFileSystemsFinalizersIfNecessary(ctx, manager); err != nil {
return errorHandler(err, "remove LustreFileSystems finalizers")
}

manager.Status.Ready = true
if err := r.Status().Update(ctx, manager); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -329,20 +333,86 @@ func (r *DataMovementManagerReconciler) updateLustreFileSystemsIfNecessary(ctx c
for _, lustre := range filesystems.Items {
_, found := lustre.Spec.Namespaces[manager.Namespace]
if !found {

if lustre.Spec.Namespaces == nil {
lustre.Spec.Namespaces = make(map[string]lusv1beta1.LustreFileSystemNamespaceSpec)
}

lustre.Spec.Namespaces[manager.Namespace] = lusv1beta1.LustreFileSystemNamespaceSpec{
Modes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany},
}
}

// Add the dm finalizer to keep this resource from being deleted until dm is no longer using it
if lustre.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(&lustre, finalizer) {
controllerutil.AddFinalizer(&lustre, finalizer)
}

if err := r.Update(ctx, &lustre); err != nil {
return err
}
log.Info("Updated LustreFileSystem", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
}

return nil
}

func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) error {
log := log.FromContext(ctx)

filesystems := &lusv1beta1.LustreFileSystemList{}
if err := r.List(ctx, filesystems); err != nil && !meta.IsNoMatchError(err) {
return fmt.Errorf("list lustre file systems failed: %w", err)
}

// Get the DS to compare the list of volumes
ds := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: manager.Namespace,
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
return err
}

finalizersToRemove := []lusv1beta1.LustreFileSystem{}
for _, lustre := range filesystems.Items {
// This lustre is in the process of deleting, verify it is not in the list of DS volumes
if !lustre.DeletionTimestamp.IsZero() {
finalizersToRemove = append(finalizersToRemove, lustre)
for _, vol := range ds.Spec.Template.Spec.Volumes {
if lustre.Name == vol.Name {
log.Info("Daemonset still has lustrefilesystem volume", "lustrefilesystem", lustre)
return nil
}
}
}
}

if len(finalizersToRemove) == 0 {
return nil
}

// Now the DS does not have any lustre filesystems that are being deleted, verify that the
// daemonset's pods (i.e. dm worker pods) have restarted
d := ds.Status.DesiredNumberScheduled
if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d {
// wait for pods to restart
log.Info("Daemonset still has pods to restart after dropping lustrefilesystem volume",
"desired", d, "updated", ds.Status.UpdatedNumberScheduled, "ready", ds.Status.NumberReady)
return nil
}

// Now the finalizers can be removed
for _, lustre := range finalizersToRemove {
if controllerutil.ContainsFinalizer(&lustre, finalizer) {
controllerutil.RemoveFinalizer(&lustre, finalizer)
if err := r.Update(ctx, &lustre); err != nil {
return err
}

log.Info("Updated LustreFileSystem", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(),
"namespace", manager.Namespace,
"daemonset", ds)
}
}

Expand Down Expand Up @@ -437,24 +507,30 @@ func setupLustreVolumes(ctx context.Context, manager *dmv1alpha1.DataMovementMan

// Setup Volumes / Volume Mounts for accessing global Lustre file systems

volumes := make([]corev1.Volume, len(fileSystems))
volumeMounts := make([]corev1.VolumeMount, len(fileSystems))
for idx, fs := range fileSystems {
volumes := []corev1.Volume{}
volumeMounts := []corev1.VolumeMount{}
for _, fs := range fileSystems {

if !fs.DeletionTimestamp.IsZero() {
log.Info("Global lustre volume is in the process of being deleted", "name", client.ObjectKeyFromObject(&fs).String())
continue
}

log.Info("Adding global lustre volume", "name", client.ObjectKeyFromObject(&fs).String())

volumes[idx] = corev1.Volume{
volumes = append(volumes, corev1.Volume{
Name: fs.Name,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: fs.PersistentVolumeClaimName(manager.Namespace, corev1.ReadWriteMany),
},
},
}
})

volumeMounts[idx] = corev1.VolumeMount{
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: fs.Name,
MountPath: fs.Spec.MountRoot,
}
})
}

// Add the NNF Mounts
Expand Down
106 changes: 87 additions & 19 deletions internal/controller/datamovementmanager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package controller

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
Expand All @@ -31,13 +29,15 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1"
dmv1alpha1 "github.com/NearNodeFlash/nnf-dm/api/v1alpha1"
)

var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

var lustre *lusv1beta1.LustreFileSystem
ns := &corev1.Namespace{}
deployment := &appsv1.Deployment{}
mgr := &dmv1alpha1.DataMovementManager{}
Expand All @@ -51,7 +51,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

err := k8sClient.Create(context.TODO(), ns)
err := k8sClient.Create(ctx, ns)
Expect(err == nil || errors.IsAlreadyExists(err)).Should(BeTrue())

// Create a dummy deployment of the data movement manager
Expand Down Expand Up @@ -81,7 +81,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

err = k8sClient.Create(context.TODO(), deployment)
err = k8sClient.Create(ctx, deployment)
Expect(err == nil || errors.IsAlreadyExists(err)).Should(BeTrue())
})

Expand Down Expand Up @@ -112,33 +112,40 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
})

JustBeforeEach(func() {
Expect(k8sClient.Create(context.TODO(), mgr)).Should(Succeed())
Expect(k8sClient.Create(ctx, mgr)).Should(Succeed())
})

JustAfterEach(func() {
Expect(k8sClient.Delete(context.TODO(), mgr)).Should(Succeed())
Expect(k8sClient.Delete(ctx, mgr)).Should(Succeed())
Eventually(func() error {
return k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)
return k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)
}).ShouldNot(Succeed())

if lustre != nil {
k8sClient.Delete(ctx, lustre) // may or may not be already deleted
Eventually(func() error {
return k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)
}).ShouldNot(Succeed())
}
})

It("Bootstraps all managed components", func() {
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
})

It("Adds global lustre volumes", func() {
It("Adds and removes global lustre volumes", func() {

By("Wait for the manager to go ready")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())

By("Creating a Global Lustre File System")
lustre := &lusv1beta1.LustreFileSystem{
lustre = &lusv1beta1.LustreFileSystem{
ObjectMeta: metav1.ObjectMeta{
Name: "global",
Namespace: corev1.NamespaceDefault,
Expand All @@ -150,17 +157,21 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

Expect(k8sClient.Create(context.TODO(), lustre)).Should(Succeed())
Expect(k8sClient.Create(ctx, lustre)).Should(Succeed())

By("Expect namespace is added to lustre volume")

Eventually(func(g Gomega) lusv1beta1.LustreFileSystemNamespaceSpec {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
return lustre.Spec.Namespaces[mgr.Namespace]
}).ShouldNot(BeNil())

By("The Volume appears in the daemon set")
By("Expect finalizer is added to lustre volume")
Eventually(func(g Gomega) []string {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
return lustre.Finalizers
}).Should(ContainElement(finalizer))

By("The Volume appears in the daemon set")
daemonset := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Expand All @@ -169,16 +180,14 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
}

Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())

g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())
g.Expect(daemonset.Spec.Template.Spec.Volumes).Should(
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal(lustre.Name),
}),
),
)

g.Expect(daemonset.Spec.Template.Spec.Containers[0].VolumeMounts).Should(
ContainElement(
MatchFields(IgnoreExtras, Fields{
Expand All @@ -187,9 +196,68 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
}),
),
)

return nil
}).Should(Succeed())

By("Deleting Global Lustre File System")
Expect(k8sClient.Delete(ctx, lustre)).To(Succeed())

By("Expect Global Lustre File system/finalizer to stay around until daemonset restarts pods without the volume")
Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())

for _, v := range daemonset.Spec.Template.Spec.Volumes {
desired := daemonset.Status.DesiredNumberScheduled
updated := daemonset.Status.UpdatedNumberScheduled
ready := daemonset.Status.NumberReady
expectedGen := daemonset.ObjectMeta.Generation
gen := daemonset.Status.ObservedGeneration

// Fake the updates to the daemonset since the daemonset controller doesn't run
fakeDSUpdates(daemonset, g)

if v.Name == lustre.Name {
// If the volume still exists, then so should lustre + finalizer
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(controllerutil.ContainsFinalizer(lustre, finalizer)).To(BeTrue())

} else if gen != expectedGen && updated != desired && ready != desired {
// If pods have not restarted, lustre + finalizer should still be there
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(controllerutil.ContainsFinalizer(lustre, finalizer)).To(BeTrue())
} else {
// Once volume is gone and pods have restarted, lustre should be gone (and return error)
return k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)
}
}

return nil
}, "15s").ShouldNot(Succeed())
})
})

// Envtest does not run the built-in controllers (e.g. daemonset controller). This function fakes
// that out. Walk the counters up by one each time so we can exercise the controller watching these
// through a few iterations.
func fakeDSUpdates(ds *appsv1.DaemonSet, g Gomega) {
const desired = 5 // number of nnf nodes

ds.Status.DesiredNumberScheduled = desired

ds.Status.ObservedGeneration++
if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation {
ds.Status.ObservedGeneration = ds.ObjectMeta.Generation
}

ds.Status.UpdatedNumberScheduled++
if ds.Status.UpdatedNumberScheduled > desired {
ds.Status.UpdatedNumberScheduled = desired
}

ds.Status.NumberReady++
if ds.Status.NumberReady > desired {
ds.Status.NumberReady = desired
}

g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed())
}

0 comments on commit 188bba1

Please sign in to comment.