Skip to content

Commit

Permalink
Add finalizers to LustreFileSystems
Browse files Browse the repository at this point in the history
LustreFileSystems can be removed before Data Movement worker pods have
unmounted them. This change adds a finalizer to the LustreFileSystems to
indicate that DM worker pods are using it. Once the LustreFileSystem has
been marked for deletion, we then ensure that the worker pods are no
longer using that volume. Then, the finalizer can be removed to indicate
that nothing is using the LustreFileSystem.

Signed-off-by: Blake Devcich <[email protected]>
  • Loading branch information
bdevcich committed Oct 27, 2023
1 parent 8928a1d commit fc9ba37
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 31 deletions.
97 changes: 85 additions & 12 deletions controllers/datamovementmanager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"time"

"golang.org/x/crypto/ssh"

Expand Down Expand Up @@ -144,6 +145,13 @@ func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.
return errorHandler(err, "create or update DaemonSet")
}

res, err := r.removeLustreFileSystemsFinalizersIfNecessary(ctx, manager)
if err != nil {
return errorHandler(err, "remove LustreFileSystems finalizers")
} else if res != nil {
return *res, nil
}

manager.Status.Ready = true
if err := r.Status().Update(ctx, manager); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -329,26 +337,85 @@ func (r *DataMovementManagerReconciler) updateLustreFileSystemsIfNecessary(ctx c
for _, lustre := range filesystems.Items {
_, found := lustre.Spec.Namespaces[manager.Namespace]
if !found {

if lustre.Spec.Namespaces == nil {
lustre.Spec.Namespaces = make(map[string]lusv1beta1.LustreFileSystemNamespaceSpec)
}

lustre.Spec.Namespaces[manager.Namespace] = lusv1beta1.LustreFileSystemNamespaceSpec{
Modes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany},
}
}

if err := r.Update(ctx, &lustre); err != nil {
return err
}
if lustre.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(&lustre, finalizer) {
controllerutil.AddFinalizer(&lustre, finalizer)
}

log.Info("Updated LustreFileSystem", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
if err := r.Update(ctx, &lustre); err != nil {
return err
}
log.Info("Updated LustreFileSystem", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
}

return nil
}

func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) (*ctrl.Result, error) {
log := log.FromContext(ctx)

filesystems := &lusv1beta1.LustreFileSystemList{}
if err := r.List(ctx, filesystems); err != nil && !meta.IsNoMatchError(err) {
return nil, fmt.Errorf("list lustre file systems failed: %w", err)
}

// Get the DS to compare the list of volumes
ds := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: manager.Namespace,
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
return nil, err
}

finalizersToRemove := []lusv1beta1.LustreFileSystem{}
for _, lustre := range filesystems.Items {
// This lustre is in the process of deleting, verify it is not in the list of DS volumes
if !lustre.DeletionTimestamp.IsZero() {
finalizersToRemove = append(finalizersToRemove, lustre)
for _, vol := range ds.Spec.Template.Spec.Volumes {
if lustre.Name == vol.Name {
log.Info("Requeue: wait for daemonset to drop lustrefilesystem volume", "lustrefilesystem", lustre)
return &ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
}
}
}

// Now the DS does not have any lustre filesystems that are being deleted, verify that the
// daemonset's pods (i.e. dm worker pods) have restarted
d := ds.Status.DesiredNumberScheduled
if ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d {
// wait for pods to restart
log.Info("Requeue: wait for daemonset to restart pods after dropping lustrefilesystem volume",
"desired", d, "updated", ds.Status.UpdatedNumberScheduled, "ready", ds.Status.NumberReady)
return &ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}

// Now the finalizers can be removed
for _, lustre := range finalizersToRemove {
if controllerutil.ContainsFinalizer(&lustre, finalizer) {
controllerutil.RemoveFinalizer(&lustre, finalizer)
if err := r.Update(ctx, &lustre); err != nil {
return nil, err
}
log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
}
}

return nil, nil
}

func (r *DataMovementManagerReconciler) createOrUpdateDaemonSetIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) error {
log := log.FromContext(ctx)

Expand Down Expand Up @@ -437,24 +504,30 @@ func setupLustreVolumes(ctx context.Context, manager *dmv1alpha1.DataMovementMan

// Setup Volumes / Volume Mounts for accessing global Lustre file systems

volumes := make([]corev1.Volume, len(fileSystems))
volumeMounts := make([]corev1.VolumeMount, len(fileSystems))
for idx, fs := range fileSystems {
volumes := []corev1.Volume{}
volumeMounts := []corev1.VolumeMount{}
for _, fs := range fileSystems {

if !fs.DeletionTimestamp.IsZero() {
log.Info("Global lustre volume is in the process of being deleted", "name", client.ObjectKeyFromObject(&fs).String())
continue
}

log.Info("Adding global lustre volume", "name", client.ObjectKeyFromObject(&fs).String())

volumes[idx] = corev1.Volume{
volumes = append(volumes, corev1.Volume{
Name: fs.Name,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: fs.PersistentVolumeClaimName(manager.Namespace, corev1.ReadWriteMany),
},
},
}
})

volumeMounts[idx] = corev1.VolumeMount{
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: fs.Name,
MountPath: fs.Spec.MountRoot,
}
})
}

// Add the NNF Mounts
Expand Down
72 changes: 53 additions & 19 deletions controllers/datamovementmanager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package controllers

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
Expand All @@ -31,13 +29,15 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1"
dmv1alpha1 "github.com/NearNodeFlash/nnf-dm/api/v1alpha1"
)

var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

var lustre *lusv1beta1.LustreFileSystem
ns := &corev1.Namespace{}
deployment := &appsv1.Deployment{}
mgr := &dmv1alpha1.DataMovementManager{}
Expand All @@ -51,7 +51,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

err := k8sClient.Create(context.TODO(), ns)
err := k8sClient.Create(ctx, ns)
Expect(err == nil || errors.IsAlreadyExists(err)).Should(BeTrue())

// Create a dummy deployment of the data movement manager
Expand Down Expand Up @@ -81,7 +81,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

err = k8sClient.Create(context.TODO(), deployment)
err = k8sClient.Create(ctx, deployment)
Expect(err == nil || errors.IsAlreadyExists(err)).Should(BeTrue())
})

Expand Down Expand Up @@ -112,33 +112,40 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
})

JustBeforeEach(func() {
Expect(k8sClient.Create(context.TODO(), mgr)).Should(Succeed())
Expect(k8sClient.Create(ctx, mgr)).Should(Succeed())
})

JustAfterEach(func() {
Expect(k8sClient.Delete(context.TODO(), mgr)).Should(Succeed())
Expect(k8sClient.Delete(ctx, mgr)).Should(Succeed())
Eventually(func() error {
return k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)
return k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)
}).ShouldNot(Succeed())

if lustre != nil {
k8sClient.Delete(ctx, lustre) // may or may not be already deleted
Eventually(func() error {
return k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)
}).ShouldNot(Succeed())
}
})

It("Bootstraps all managed components", func() {
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
})

It("Adds global lustre volumes", func() {
It("Adds and removes global lustre volumes", func() {

By("Wait for the manager to go ready")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())

By("Creating a Global Lustre File System")
lustre := &lusv1beta1.LustreFileSystem{
lustre = &lusv1beta1.LustreFileSystem{
ObjectMeta: metav1.ObjectMeta{
Name: "global",
Namespace: corev1.NamespaceDefault,
Expand All @@ -150,17 +157,21 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

Expect(k8sClient.Create(context.TODO(), lustre)).Should(Succeed())
Expect(k8sClient.Create(ctx, lustre)).Should(Succeed())

By("Expect namespace is added to lustre volume")

Eventually(func(g Gomega) lusv1beta1.LustreFileSystemNamespaceSpec {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
return lustre.Spec.Namespaces[mgr.Namespace]
}).ShouldNot(BeNil())

By("The Volume appears in the daemon set")
By("Expect finalizer is added to lustre volume")
Eventually(func(g Gomega) []string {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
return lustre.Finalizers
}).Should(ContainElement(finalizer))

By("The Volume appears in the daemon set")
daemonset := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Expand All @@ -169,16 +180,14 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
}

Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())

g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())
g.Expect(daemonset.Spec.Template.Spec.Volumes).Should(
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal(lustre.Name),
}),
),
)

g.Expect(daemonset.Spec.Template.Spec.Containers[0].VolumeMounts).Should(
ContainElement(
MatchFields(IgnoreExtras, Fields{
Expand All @@ -187,9 +196,34 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
}),
),
)

return nil
}).Should(Succeed())

By("Deleting Global Lustre File System")
Expect(k8sClient.Delete(ctx, lustre)).To(Succeed())

By("Expect Global Lustre File system/finalizer to stay around until daemonset restarts pods without the volume")
Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())

for _, v := range daemonset.Spec.Template.Spec.Volumes {
desired := daemonset.Status.DesiredNumberScheduled
updated := daemonset.Status.UpdatedNumberScheduled
ready := daemonset.Status.NumberReady
if v.Name == lustre.Name {
// If the volume still exists, then so should lustre + finalizer
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(controllerutil.ContainsFinalizer(lustre, finalizer)).To(BeTrue())
} else if updated != desired && ready != desired {
// If pods have not restarted, lustre + finalizer should still be there
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(controllerutil.ContainsFinalizer(lustre, finalizer)).To(BeTrue())
} else {
// Once volume is gone and pods have restarted, lustre should be gone (and return error)
return k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)
}
}
return nil
}).ShouldNot(Succeed())
})
})

0 comments on commit fc9ba37

Please sign in to comment.