Skip to content

Commit

Permalink
Gate NnfDataMovementManager.Status.Ready flag on daemonset ready
Browse files Browse the repository at this point in the history
* Don't signal ready until the nnf-dm-worker daemonset is update to date
  and ready
* If there's an error, set ready to false
* Added isDaemonSetReady() function to check on the status of the
  daemonset. This needs to happen as part of the main reconciler loop
  and when removing the finalizers.
* Use the status updater to set the Ready flag.

Signed-off-by: Blake Devcich <[email protected]>
  • Loading branch information
bdevcich committed Nov 3, 2023
1 parent 188bba1 commit 71d98ec
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/controllers",
"program": "${workspaceFolder}/internal/controller",
"args": [
"-ginkgo.v",
"-ginkgo.progress",
Expand Down
6 changes: 6 additions & 0 deletions api/v1alpha1/datamovementmanager_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/DataWorkflowServices/dws/utils/updater"
)

const (
Expand Down Expand Up @@ -89,3 +91,7 @@ type DataMovementManagerList struct {
func init() {
SchemeBuilder.Register(&DataMovementManager{}, &DataMovementManagerList{})
}

func (m *DataMovementManager) GetStatus() updater.Status[*DataMovementManagerStatus] {
return &m.Status
}
60 changes: 47 additions & 13 deletions internal/controller/datamovementmanager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -45,6 +46,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/DataWorkflowServices/dws/utils/updater"
lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1"
dmv1alpha1 "github.com/NearNodeFlash/nnf-dm/api/v1alpha1"
"github.com/NearNodeFlash/nnf-dm/internal/controller/metrics"
Expand Down Expand Up @@ -106,7 +108,7 @@ type DataMovementManagerReconciler struct {

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
log := log.FromContext(ctx)

metrics.NnfDmDataMovementManagerReconcilesTotal.Inc()
Expand All @@ -116,10 +118,14 @@ func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

statusUpdater := updater.NewStatusUpdater[*dmv1alpha1.DataMovementManagerStatus](manager)
defer func() { err = statusUpdater.CloseWithStatusUpdate(ctx, r.Client.Status(), err) }()

errorHandler := func(err error, msg string) (ctrl.Result, error) {
if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
manager.Status.Ready = false
log.Error(err, msg+" failed")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -148,11 +154,15 @@ func (r *DataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.
return errorHandler(err, "remove LustreFileSystems finalizers")
}

manager.Status.Ready = true
if err := r.Status().Update(ctx, manager); err != nil {
return ctrl.Result{}, err
if ready, err := r.isDaemonSetReady(ctx, manager); err != nil {
return errorHandler(err, "check if daemonset is ready")
} else if !ready {
manager.Status.Ready = false
log.Info("daemonset not ready")
return ctrl.Result{}, nil
}

manager.Status.Ready = true
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -372,6 +382,9 @@ func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNeces
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

Expand All @@ -395,12 +408,11 @@ func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNeces

// Now the DS does not have any lustre filesystems that are being deleted, verify that the
// daemonset's pods (i.e. dm worker pods) have restarted
d := ds.Status.DesiredNumberScheduled
if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d {
// wait for pods to restart
log.Info("Daemonset still has pods to restart after dropping lustrefilesystem volume",
"desired", d, "updated", ds.Status.UpdatedNumberScheduled, "ready", ds.Status.NumberReady)
if ready, err := r.isDaemonSetReady(ctx, manager); !ready {
log.Info("Daemonset still has pods to restart after dropping lustrefilesystem volume")
return nil
} else if err != nil {
return err
}

// Now the finalizers can be removed
Expand All @@ -410,15 +422,37 @@ func (r *DataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNeces
if err := r.Update(ctx, &lustre); err != nil {
return err
}
log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(),
"namespace", manager.Namespace,
"daemonset", ds)
log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
}
}

return nil
}

func (r *DataMovementManagerReconciler) isDaemonSetReady(ctx context.Context, manager *dmv1alpha1.DataMovementManager) (bool, error) {
ds := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: manager.Namespace,
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
if apierrors.IsNotFound(err) {
return false, nil

}
return false, err
}

d := ds.Status.DesiredNumberScheduled
log.FromContext(ctx).Info("isDaemonSetReady", "generation", ds.ObjectMeta.Generation, "status", ds.Status)
if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d {
return false, nil
}

return true, nil
}

func (r *DataMovementManagerReconciler) createOrUpdateDaemonSetIfNecessary(ctx context.Context, manager *dmv1alpha1.DataMovementManager) error {
log := log.FromContext(ctx)

Expand Down Expand Up @@ -473,7 +507,7 @@ func (r *DataMovementManagerReconciler) createOrUpdateDaemonSetIfNecessary(ctx c
if result == controllerutil.OperationResultCreated {
log.Info("Created DaemonSet", "object", client.ObjectKeyFromObject(ds).String())
} else if result == controllerutil.OperationResultUpdated {
log.Info("Updated DaemonSet", "object", client.ObjectKeyFromObject(ds).String())
log.Info("Updated DaemonSet", "object", client.ObjectKeyFromObject(ds).String(), "generation", ds.ObjectMeta.Generation, "status", ds.Status)
}

return nil
Expand Down
88 changes: 63 additions & 25 deletions internal/controller/datamovementmanager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

Expand All @@ -38,6 +39,8 @@ import (
var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

var lustre *lusv1beta1.LustreFileSystem
var daemonset *appsv1.DaemonSet

ns := &corev1.Namespace{}
deployment := &appsv1.Deployment{}
mgr := &dmv1alpha1.DataMovementManager{}
Expand Down Expand Up @@ -109,6 +112,13 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
},
}

daemonset = &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: mgr.Namespace,
},
}
})

JustBeforeEach(func() {
Expand All @@ -131,15 +141,17 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

It("Bootstraps all managed components", func() {
Eventually(func(g Gomega) bool {
g.Expect(fakeDSUpdates(daemonset)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
}, "5s").Should(BeTrue())
})

It("Adds and removes global lustre volumes", func() {

By("Wait for the manager to go ready")
Eventually(func(g Gomega) bool {
g.Expect(fakeDSUpdates(daemonset)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
Expand All @@ -159,6 +171,12 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

Expect(k8sClient.Create(ctx, lustre)).Should(Succeed())

By("Status should not be ready")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeFalse())

By("Expect namespace is added to lustre volume")
Eventually(func(g Gomega) lusv1beta1.LustreFileSystemNamespaceSpec {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
Expand All @@ -172,13 +190,6 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
}).Should(ContainElement(finalizer))

By("The Volume appears in the daemon set")
daemonset := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: mgr.Namespace,
},
}

Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())
g.Expect(daemonset.Spec.Template.Spec.Volumes).Should(
Expand All @@ -199,9 +210,22 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
return nil
}).Should(Succeed())

By("Status should be ready after daemonset is up to date")
Eventually(func(g Gomega) bool {
g.Expect(fakeDSUpdates(daemonset)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())

By("Deleting Global Lustre File System")
Expect(k8sClient.Delete(ctx, lustre)).To(Succeed())

By("Status should be ready since daemonset was updated")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeFalse())

By("Expect Global Lustre File system/finalizer to stay around until daemonset restarts pods without the volume")
Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())
Expand All @@ -214,7 +238,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
gen := daemonset.Status.ObservedGeneration

// Fake the updates to the daemonset since the daemonset controller doesn't run
fakeDSUpdates(daemonset, g)
g.Expect(fakeDSUpdates(daemonset)).To(Succeed())

if v.Name == lustre.Name {
// If the volume still exists, then so should lustre + finalizer
Expand All @@ -233,31 +257,45 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

return nil
}, "15s").ShouldNot(Succeed())

By("Status should be ready since daemonset is up to date from previous step")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
})
})

// Envtest does not run the built-in controllers (e.g. daemonset controller). This function fakes
// that out. Walk the counters up by one each time so we can exercise the controller watching these
// through a few iterations.
func fakeDSUpdates(ds *appsv1.DaemonSet, g Gomega) {
const desired = 5 // number of nnf nodes
func fakeDSUpdates(ds *appsv1.DaemonSet) error {
const desired = 2 // number of nnf nodes

ds.Status.DesiredNumberScheduled = desired
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
return err
}
ds.Status.DesiredNumberScheduled = desired

ds.Status.ObservedGeneration++
if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation {
ds.Status.ObservedGeneration = ds.ObjectMeta.Generation
}
ds.Status.ObservedGeneration++
if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation {
ds.Status.ObservedGeneration = ds.ObjectMeta.Generation
}

ds.Status.UpdatedNumberScheduled++
if ds.Status.UpdatedNumberScheduled > desired {
ds.Status.UpdatedNumberScheduled = desired
}
ds.Status.UpdatedNumberScheduled++
if ds.Status.UpdatedNumberScheduled > desired {
ds.Status.UpdatedNumberScheduled = desired
}

ds.Status.NumberReady++
if ds.Status.NumberReady > desired {
ds.Status.NumberReady = desired
}
return k8sClient.Status().Update(ctx, ds)
})

ds.Status.NumberReady++
if ds.Status.NumberReady > desired {
ds.Status.NumberReady = desired
}
return err

g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed())
// g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed())
}

0 comments on commit 71d98ec

Please sign in to comment.