Skip to content

Commit

Permalink
Gate NnfDataMovementManager.Status.Ready flag on daemonset ready (#138)
Browse files Browse the repository at this point in the history
* Don't signal ready until the nnf-dm-worker daemonset is update to date
  and ready
* If there's an error, set ready to false
* Added isDaemonSetReady() function to check on the status of the
  daemonset. This needs to happen as part of the main reconciler loop
  and when removing the finalizers.
* Use the status updater to set the Ready flag.

Signed-off-by: Blake Devcich <[email protected]>
  • Loading branch information
bdevcich authored Nov 15, 2023
1 parent 9b94462 commit eb499d6
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/controllers",
"program": "${workspaceFolder}/internal/controller",
"args": [
"-ginkgo.v",
"-ginkgo.progress",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/NearNodeFlash/lustre-fs-operator v0.0.1-0.20231031201943-531116c1194e
github.com/NearNodeFlash/nnf-sos v0.0.1-0.20231108192651-ab8d87963df0
github.com/NearNodeFlash/nnf-sos v0.0.1-0.20231114204216-1a62f95d74d5
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.10
github.com/prometheus/client_golang v1.16.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/NearNodeFlash/lustre-fs-operator v0.0.1-0.20231031201943-531116c1194e
github.com/NearNodeFlash/lustre-fs-operator v0.0.1-0.20231031201943-531116c1194e/go.mod h1:qBcz9p8sXm1qhDf8WUmhxTlD1NCMEjoAD7NoHbQvMiI=
github.com/NearNodeFlash/nnf-ec v0.0.0-20231010162453-a8168bb6a52f h1:aWtSSQLLk9mUZj94mowirQeVw9saf80gVe10X0rZe8o=
github.com/NearNodeFlash/nnf-ec v0.0.0-20231010162453-a8168bb6a52f/go.mod h1:oxdwMqfttOF9dabJhqrWlirCnMk8/8eyLMwl+hducjk=
github.com/NearNodeFlash/nnf-sos v0.0.1-0.20231108192651-ab8d87963df0 h1:p6AuBbayRXU8WeBLBXXIihmJaB8IDJe9GjcEMFzJn6o=
github.com/NearNodeFlash/nnf-sos v0.0.1-0.20231108192651-ab8d87963df0/go.mod h1:YX9Q91wqtUmfZjU4KxSwZMDJGBzppiGEW4BpAVTIMAs=
github.com/NearNodeFlash/nnf-sos v0.0.1-0.20231114204216-1a62f95d74d5 h1:ngJNueL3HbFewXc4pf3mRFQfEKOk0q6eJcQ4Ir787ho=
github.com/NearNodeFlash/nnf-sos v0.0.1-0.20231114204216-1a62f95d74d5/go.mod h1:t0KypbCmssZzL9vhQFHLdauxHKgptJK1SbPJHjm+Baw=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
59 changes: 46 additions & 13 deletions internal/controller/datamovementmanager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -45,6 +46,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/DataWorkflowServices/dws/utils/updater"
lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1"
"github.com/NearNodeFlash/nnf-dm/internal/controller/metrics"
nnfv1alpha1 "github.com/NearNodeFlash/nnf-sos/api/v1alpha1"
Expand Down Expand Up @@ -106,7 +108,7 @@ type NnfDataMovementManagerReconciler struct {

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *NnfDataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *NnfDataMovementManagerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
log := log.FromContext(ctx)

metrics.NnfDmDataMovementManagerReconcilesTotal.Inc()
Expand All @@ -116,10 +118,14 @@ func (r *NnfDataMovementManagerReconciler) Reconcile(ctx context.Context, req ct
return ctrl.Result{}, client.IgnoreNotFound(err)
}

statusUpdater := updater.NewStatusUpdater[*nnfv1alpha1.NnfDataMovementManagerStatus](manager)
defer func() { err = statusUpdater.CloseWithStatusUpdate(ctx, r.Client.Status(), err) }()

errorHandler := func(err error, msg string) (ctrl.Result, error) {
if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
manager.Status.Ready = false
log.Error(err, msg+" failed")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -148,11 +154,15 @@ func (r *NnfDataMovementManagerReconciler) Reconcile(ctx context.Context, req ct
return errorHandler(err, "remove LustreFileSystems finalizers")
}

manager.Status.Ready = true
if err := r.Status().Update(ctx, manager); err != nil {
return ctrl.Result{}, err
if ready, err := r.isDaemonSetReady(ctx, manager); err != nil {
return errorHandler(err, "check if daemonset is ready")
} else if !ready {
manager.Status.Ready = false
log.Info("Daemonset not ready")
return ctrl.Result{}, nil
}

manager.Status.Ready = true
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -372,6 +382,9 @@ func (r *NnfDataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNe
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

Expand All @@ -395,12 +408,11 @@ func (r *NnfDataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNe

// Now the DS does not have any lustre filesystems that are being deleted, verify that the
// daemonset's pods (i.e. dm worker pods) have restarted
d := ds.Status.DesiredNumberScheduled
if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d {
// wait for pods to restart
log.Info("Daemonset still has pods to restart after dropping lustrefilesystem volume",
"desired", d, "updated", ds.Status.UpdatedNumberScheduled, "ready", ds.Status.NumberReady)
if ready, err := r.isDaemonSetReady(ctx, manager); !ready {
log.Info("Daemonset still has pods to restart after dropping lustrefilesystem volume")
return nil
} else if err != nil {
return err
}

// Now the finalizers can be removed
Expand All @@ -410,9 +422,7 @@ func (r *NnfDataMovementManagerReconciler) removeLustreFileSystemsFinalizersIfNe
if err := r.Update(ctx, &lustre); err != nil {
return err
}
log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(),
"namespace", manager.Namespace,
"daemonset", ds)
log.Info("Removed LustreFileSystem finalizer", "object", client.ObjectKeyFromObject(&lustre).String(), "namespace", manager.Namespace)
}
}

Expand Down Expand Up @@ -473,12 +483,35 @@ func (r *NnfDataMovementManagerReconciler) createOrUpdateDaemonSetIfNecessary(ct
if result == controllerutil.OperationResultCreated {
log.Info("Created DaemonSet", "object", client.ObjectKeyFromObject(ds).String())
} else if result == controllerutil.OperationResultUpdated {
log.Info("Updated DaemonSet", "object", client.ObjectKeyFromObject(ds).String())
log.Info("Updated DaemonSet", "object", client.ObjectKeyFromObject(ds).String(), "generation", ds.ObjectMeta.Generation, "status", ds.Status)
}

return nil
}

func (r *NnfDataMovementManagerReconciler) isDaemonSetReady(ctx context.Context, manager *nnfv1alpha1.NnfDataMovementManager) (bool, error) {
ds := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: manager.Namespace,
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
if apierrors.IsNotFound(err) {
return false, nil

}
return false, err
}

d := ds.Status.DesiredNumberScheduled
if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation || ds.Status.UpdatedNumberScheduled != d || ds.Status.NumberReady != d {
return false, nil
}

return true, nil
}

func setupSSHAuthVolumes(manager *nnfv1alpha1.NnfDataMovementManager, podSpec *corev1.PodSpec) {
mode := int32(0600)
podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{
Expand Down
88 changes: 63 additions & 25 deletions internal/controller/datamovementmanager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

Expand All @@ -38,6 +39,8 @@ import (
var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

var lustre *lusv1beta1.LustreFileSystem
var daemonset *appsv1.DaemonSet

ns := &corev1.Namespace{}
deployment := &appsv1.Deployment{}
mgr := &nnfv1alpha1.NnfDataMovementManager{}
Expand Down Expand Up @@ -109,6 +112,13 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
},
},
}

daemonset = &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: mgr.Namespace,
},
}
})

JustBeforeEach(func() {
Expand All @@ -131,15 +141,17 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

It("Bootstraps all managed components", func() {
Eventually(func(g Gomega) bool {
g.Expect(fakeDSUpdates(daemonset)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
}, "5s").Should(BeTrue())
})

It("Adds and removes global lustre volumes", func() {

By("Wait for the manager to go ready")
Eventually(func(g Gomega) bool {
g.Expect(fakeDSUpdates(daemonset)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
Expand All @@ -159,6 +171,12 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

Expect(k8sClient.Create(ctx, lustre)).Should(Succeed())

By("Status should not be ready")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeFalse())

By("Expect namespace is added to lustre volume")
Eventually(func(g Gomega) lusv1beta1.LustreFileSystemNamespaceSpec {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lustre), lustre)).Should(Succeed())
Expand All @@ -172,13 +190,6 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
}).Should(ContainElement(finalizer))

By("The Volume appears in the daemon set")
daemonset := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: mgr.Namespace,
},
}

Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())
g.Expect(daemonset.Spec.Template.Spec.Volumes).Should(
Expand All @@ -199,9 +210,22 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
return nil
}).Should(Succeed())

By("Status should be ready after daemonset is up to date")
Eventually(func(g Gomega) bool {
g.Expect(fakeDSUpdates(daemonset)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())

By("Deleting Global Lustre File System")
Expect(k8sClient.Delete(ctx, lustre)).To(Succeed())

By("Status should be ready since daemonset was updated")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeFalse())

By("Expect Global Lustre File system/finalizer to stay around until daemonset restarts pods without the volume")
Eventually(func(g Gomega) error {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(daemonset), daemonset)).Should(Succeed())
Expand All @@ -214,7 +238,7 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {
gen := daemonset.Status.ObservedGeneration

// Fake the updates to the daemonset since the daemonset controller doesn't run
fakeDSUpdates(daemonset, g)
g.Expect(fakeDSUpdates(daemonset)).To(Succeed())

if v.Name == lustre.Name {
// If the volume still exists, then so should lustre + finalizer
Expand All @@ -233,31 +257,45 @@ var _ = Describe("Data Movement Manager Test" /*Ordered, (Ginkgo v2)*/, func() {

return nil
}, "15s").ShouldNot(Succeed())

By("Status should be ready since daemonset is up to date from previous step")
Eventually(func(g Gomega) bool {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(mgr), mgr)).Should(Succeed())
return mgr.Status.Ready
}).Should(BeTrue())
})
})

// Envtest does not run the built-in controllers (e.g. daemonset controller). This function fakes
// that out. Walk the counters up by one each time so we can exercise the controller watching these
// through a few iterations.
func fakeDSUpdates(ds *appsv1.DaemonSet, g Gomega) {
const desired = 5 // number of nnf nodes
func fakeDSUpdates(ds *appsv1.DaemonSet) error {
const desired = 2 // number of nnf nodes

ds.Status.DesiredNumberScheduled = desired
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(ds), ds); err != nil {
return err
}
ds.Status.DesiredNumberScheduled = desired

ds.Status.ObservedGeneration++
if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation {
ds.Status.ObservedGeneration = ds.ObjectMeta.Generation
}
ds.Status.ObservedGeneration++
if ds.Status.ObservedGeneration > ds.ObjectMeta.Generation {
ds.Status.ObservedGeneration = ds.ObjectMeta.Generation
}

ds.Status.UpdatedNumberScheduled++
if ds.Status.UpdatedNumberScheduled > desired {
ds.Status.UpdatedNumberScheduled = desired
}
ds.Status.UpdatedNumberScheduled++
if ds.Status.UpdatedNumberScheduled > desired {
ds.Status.UpdatedNumberScheduled = desired
}

ds.Status.NumberReady++
if ds.Status.NumberReady > desired {
ds.Status.NumberReady = desired
}
return k8sClient.Status().Update(ctx, ds)
})

ds.Status.NumberReady++
if ds.Status.NumberReady > desired {
ds.Status.NumberReady = desired
}
return err

g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed())
// g.Expect(k8sClient.Status().Update(ctx, ds)).Should(Succeed())
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit eb499d6

Please sign in to comment.