Skip to content

Commit

Permalink
Add finalizers to LustreFileSystems
Browse files Browse the repository at this point in the history
LustreFileSystems can be removed before Data Movement worker pods have
unmounted them. This change adds a finalizer to the LustreFileSystems to
indicate that DM worker pods are using it. Once the LustreFileSystem has
been marked for deletion, we then ensure that the worker pods are no
longer using that volume. Then, the finalizer can be removed to indicate
that nothing is using the LustreFileSystem.

Signed-off-by: Blake Devcich <[email protected]>
  • Loading branch information
bdevcich committed Nov 1, 2023
1 parent 8928a1d commit 9f32b80
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 31 deletions.
104 changes: 92 additions & 12 deletions controllers/datamovementmanager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"time"

"golang.org/x/crypto/ssh"

Expand Down Expand Up @@ -144,6 +145,13 @@ func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.
return errorHandler(err, "create or update DaemonSet")
}

res, err := r.removeLustreFileSystemsFinalizersIfNecessary(ctx, manager)
if err != nil {
return errorHandler(err, "remove LustreFileSystems finalizers")
} else if res != nil {
return *res, nil
}

manager.Status.Ready = true
if err := r.Status().Update(ctx, manager); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -329,26 +337,92 @@ func (r *DataMovementManagerReconciler) updateLustreFileSystemsIfNecessary(ctx c
for _, lustre := range filesystems.Items {
_, found := lustre.Spec.Namespaces[manager.Namespace]
if !found {

if lustre.Spec.Namespaces == nil {
lustre.Spec.Namespaces = make(map[string]lusv1beta1.LustreFileSystemNamespaceSpec)
}

lustre.Spec.Namespaces[manager.Namespace] = lusv1beta1.LustreFileSystemNamespaceSpec{
Modes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany},
}
}

if err := r.Update(ctx, &lustre); err != nil {
return err
}
// Add the dm finalizer to keep this resource from being deleted until dm is no longer using it
if lustre.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(&lustre, finalizer) {
controllerutil.AddFinalizer(&lustre, finalizer)
}

log.Info("Updated LustreFileSystem", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
if err := r.Update(ctx, &lustre); err != nil {
return err
}
log.Info("Updated LustreFileSystem", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
}

return nil
}

func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) (*ctrl.Result, error) {
log := log.FromContext(ctx)

filesystems := &lusv1beta1.LustreFileSystemList{}
if err := r.List(ctx, filesystems); err != nil && !meta.IsNoMatchError(err) {
return nil, fmt.Errorf("list lustre file systems failed: %w", err)
}

// Get the DS to compare the list of volumes
ds := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: manager.Namespace,
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
return nil, err
}

finalizersToRemove := []lusv1beta1.LustreFileSystem{}
for _, lustre := range filesystems.Items {
// This lustre is in the process of deleting, verify it is not in the list of DS volumes
if !lustre.DeletionTimestamp.IsZero() {
finalizersToRemove = append(finalizersToRemove, lustre)
for _, vol := range ds.Spec.Template.Spec.Volumes {
if lustre.Name == vol.Name {
log.Info("Requeue: wait for daemonset to drop lustrefilesystem volume", "lustrefilesystem", lustre)
return &ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
}
}
}

if len(finalizersToRemove) == 0 {
return nil, nil
}

// Now the DS does not have any lustre filesystems that are being deleted, verify that the
// daemonset's pods (i.e. dm worker pods) have restarted
d := ds.Status.DesiredNumberScheduled
if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d {
// wait for pods to restart
log.Info("Requeue: wait for daemonset to restart pods after dropping lustrefilesystem volume",
"desired", d, "updated", ds.Status.UpdatedNumberScheduled, "ready", ds.Status.NumberReady)
return &ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}

// Now the finalizers can be removed
for _, lustre := range finalizersToRemove {
if controllerutil.ContainsFinalizer(&lustre, finalizer) {
controllerutil.RemoveFinalizer(&lustre, finalizer)
if err := r.Update(ctx, &lustre); err != nil {
return nil, err
}
log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(),
"namespace", manager.Namespace,
"daemonset", ds)
}
}

return nil, nil
}

func (r *DataMovementManagerReconciler) createOrUpdateDaemonSetIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) error {
log := log.FromContext(ctx)

Expand Down Expand Up @@ -437,24 +511,30 @@ func setupLustreVolumes(ctx context.Context, manager *dmv1alpha1.DataMovementMan

// Setup Volumes / Volume Mounts for accessing global Lustre file systems

volumes := make([]corev1.Volume, len(fileSystems))
volumeMounts := make([]corev1.VolumeMount, len(fileSystems))
for idx, fs := range fileSystems {
volumes := []corev1.Volume{}
volumeMounts := []corev1.VolumeMount{}
for _, fs := range fileSystems {

if !fs.DeletionTimestamp.IsZero() {
log.Info("Global lustre volume is in the process of being deleted", "name", client.ObjectKeyFromObject(&fs).String())
continue
}

log.Info("Adding global lustre volume", "name", client.ObjectKeyFromObject(&fs).String())

volumes[idx] = corev1.Volume{
volumes = append(volumes, corev1.Volume{
Name: fs.Name,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: fs.PersistentVolumeClaimName(manager.Namespace, corev1.ReadWriteMany),
},
},
}
})

volumeMounts[idx] = corev1.VolumeMount{
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: fs.Name,
MountPath: fs.Spec.MountRoot,
}
})
}

// Add the NNF Mounts
Expand Down
106 changes: 87 additions & 19 deletions controllers/datamovementmanager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package controllers

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
Expand All @@ -31,13 +29,15 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1"
dmv1alpha1 "github.com/NearNodeFlash/nnf-dm/api/v1alpha1"
)

var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

var lustre *lusv1beta1.LustreFileSystem
ns := &corev1.Namespace{}
deployment := &appsv1.Deployment{}
mgr := &dmv1alpha1.DataMovementManager{}
Expand All @@ -51,7 +51,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

err := k8sClient.Create(context.TODO(), ns)
err := k8sClient.Create(ctx, ns)
Expect(err == nil || errors.IsAlreadyExists(err)).Should(BeTrue())

// Create a dummy deployment of the data movement manager
Expand Down Expand Up @@ -81,7 +81,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

err = k8sClient.Create(context.TODO(), deployment)
err = k8sClient.Create(ctx, deployment)
Expect(err == nil || errors.IsAlreadyExists(err)).Should(BeTrue())
})

Expand Down Expand Up @@ -112,33 +112,40 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
})

JustBeforeEach(func() {
Expect(k8sClient.Create(context.TODO(), mgr)).Should(Succeed())
Expect(k8sClient.Create(ctx, mgr)).Should(Succeed())
})

JustAfterEach(func() {
Expect(k8sClient.Delete(context.TODO(), mgr)).Should(Succeed())
Expect(k8sClient.Delete(ctx, mgr)).Should(Succeed())
Eventually(func() error {
return k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)
return k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)
}).ShouldNot(Succeed())

if lustre != nil {
k8sClient.Delete(ctx, lustre) // may or may not be already deleted
Eventually(func() error {
return k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)
}).ShouldNot(Succeed())
}
})

It("Bootstraps all managed components", func() {
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
})

It("Adds global lustre volumes", func() {
It("Adds and removes global lustre volumes", func() {

By("Wait for the manager to go ready")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())

By("Creating a Global Lustre File System")
lustre := &lusv1beta1.LustreFileSystem{
lustre = &lusv1beta1.LustreFileSystem{
ObjectMeta: metav1.ObjectMeta{
Name: "global",
Namespace: corev1.NamespaceDefault,
Expand All @@ -150,17 +157,21 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
}

Expect(k8sClient.Create(context.TODO(), lustre)).Should(Succeed())
Expect(k8sClient.Create(ctx, lustre)).Should(Succeed())

By("Expect namespace is added to lustre volume")

Eventually(func(g Gomega) lusv1beta1.LustreFileSystemNamespaceSpec {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
return lustre.Spec.Namespaces[mgr.Namespace]
}).ShouldNot(BeNil())

By("The Volume appears in the daemon set")
By("Expect finalizer is added to lustre volume")
Eventually(func(g Gomega) []string {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
return lustre.Finalizers
}).Should(ContainElement(finalizer))

By("The Volume appears in the daemon set")
daemonset := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Expand All @@ -169,16 +180,14 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
}

Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())

g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())
g.Expect(daemonset.Spec.Template.Spec.Volumes).Should(
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal(lustre.Name),
}),
),
)

g.Expect(daemonset.Spec.Template.Spec.Containers[0].VolumeMounts).Should(
ContainElement(
MatchFields(IgnoreExtras, Fields{
Expand All @@ -187,9 +196,68 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
}),
),
)

return nil
}).Should(Succeed())

By("Deleting Global Lustre File System")
Expect(k8sClient.Delete(ctx, lustre)).To(Succeed())

By("Expect Global Lustre File system/finalizer to stay around until daemonset restarts pods without the volume")
Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())

for _, v := range daemonset.Spec.Template.Spec.Volumes {
desired := daemonset.Status.DesiredNumberScheduled
updated := daemonset.Status.UpdatedNumberScheduled
ready := daemonset.Status.NumberReady
expectedGen := daemonset.ObjectMeta.Generation
gen := daemonset.Status.ObservedGeneration

// Fake the updates to the daemonset since the daemonset controller doesn't run
fakeDSUpdates(daemonset, g)

if v.Name == lustre.Name {
// If the volume still exists, then so should lustre + finalizer
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(controllerutil.ContainsFinalizer(lustre, finalizer)).To(BeTrue())

} else if gen != expectedGen && updated != desired && ready != desired {
// If pods have not restarted, lustre + finalizer should still be there
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
g.Expect(controllerutil.ContainsFinalizer(lustre, finalizer)).To(BeTrue())
} else {
// Once volume is gone and pods have restarted, lustre should be gone (and return error)
return k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)
}
}

return nil
}, "15s").ShouldNot(Succeed())
})
})

// Envtest does not run the built-in controllers (e.g. daemonset controller). This function fakes
// that out. Walk the counters up by one each time so we can exercise the controller watching these
// through a few iterations.
func fakeDSUpdates(ds *appsv1.DaemonSet, g Gomega) {
const desired = 5 // number of nnf nodes

ds.Status.DesiredNumberScheduled = desired

ds.Status.ObservedGeneration++
if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation {
ds.Status.ObservedGeneration = ds.ObjectMeta.Generation
}

ds.Status.UpdatedNumberScheduled++
if ds.Status.UpdatedNumberScheduled > desired {
ds.Status.UpdatedNumberScheduled = desired
}

ds.Status.NumberReady++
if ds.Status.NumberReady > desired {
ds.Status.NumberReady = desired
}

g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed())
}

0 comments on commit 9f32b80

Please sign in to comment.