From 157d26536f0ce7b959eea694583e8f16ef65bc59 Mon Sep 17 00:00:00 2001 From: Mario Manno Date: Fri, 18 Oct 2024 16:15:29 +0200 Subject: [PATCH] Remove bd update workaround from drift detection (#2942) This adds a new reconciler, which reacts to a channel source, to remove the bd update workaround. The driftdetect mini controller previously signaled change via the bundledeployment's status to the controller-runtime based reconciler. The bundledeployment reconciler is now split into two, one reconciler to install bundles and another to handle drift. Both can update the status of the bundledeployment resource with the current state of the deployment. --- integrationtests/agent/suite_test.go | 27 +++- .../controller/bundledeployment_controller.go | 27 +--- .../cmd/agent/controller/drift_controller.go | 129 ++++++++++++++++++ .../agent/deployer/driftdetect/driftdetect.go | 70 ++-------- .../cmd/agent/deployer/normalizers/norm.go | 4 +- internal/cmd/agent/operator.go | 24 +++- pkg/durations/durations.go | 4 +- 7 files changed, 195 insertions(+), 90 deletions(-) create mode 100644 internal/cmd/agent/controller/drift_controller.go diff --git a/integrationtests/agent/suite_test.go b/integrationtests/agent/suite_test.go index f32f02ae73..c27d903c88 100644 --- a/integrationtests/agent/suite_test.go +++ b/integrationtests/agent/suite_test.go @@ -3,6 +3,7 @@ package agent_test import ( "context" "fmt" + "os" "path/filepath" "strings" "testing" @@ -31,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -74,9 +76,11 @@ var _ = BeforeSuite(func() { SetDefaultEventuallyTimeout(timeout) ctx, cancel = context.WithCancel(context.TODO()) + existing := os.Getenv("CI_USE_EXISTING_CLUSTER") == "true" testEnv = &envtest.Environment{ CRDDirectoryPaths: []string{filepath.Join("..", "..", "charts", "fleet-crd", "templates", "crds.yaml")}, ErrorIfCRDPathMissing: true, + UseExistingCluster: &existing, } var err error @@ -103,12 +107,28 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + driftChan := make(chan event.GenericEvent) + // Set up the bundledeployment reconciler Expect(k8sClient.Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: clusterNS}})).ToNot(HaveOccurred()) - reconciler := newReconciler(ctx, k8sManager, newLookup(resources)) + reconciler := newReconciler(ctx, k8sManager, newLookup(resources), driftChan) err = reconciler.SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred(), "failed to set up manager") + // Set up the driftdetect reconciler + driftReconciler := &controller.DriftReconciler{ + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + + Deployer: reconciler.Deployer, + Monitor: reconciler.Monitor, + DriftDetect: reconciler.DriftDetect, + + DriftChan: driftChan, + } + err = driftReconciler.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred(), "failed to set up manager") + go func() { defer GinkgoRecover() err = k8sManager.Start(ctx) @@ -124,7 +144,7 @@ var _ = AfterSuite(func() { // newReconciler creates a new BundleDeploymentReconciler that will watch for changes // in the test Fleet namespace, using configuration from the provided manager. // Resources are provided by the lookup parameter. -func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup) *controller.BundleDeploymentReconciler { +func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup, driftChan chan event.GenericEvent) *controller.BundleDeploymentReconciler { upstreamClient := mgr.GetClient() // re-use client, since this is a single cluster test localClient := upstreamClient @@ -174,12 +194,11 @@ func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup) *co trigger := trigger.New(ctx, localDynamic, mgr.GetRESTMapper()) driftdetect := driftdetect.New( trigger, - upstreamClient, - mgr.GetAPIReader(), dsClient, defaultNamespace, defaultNamespace, agentScope, + driftChan, ) // Build the clean up diff --git a/internal/cmd/agent/controller/bundledeployment_controller.go b/internal/cmd/agent/controller/bundledeployment_controller.go index 4d51622736..605c52aba2 100644 --- a/internal/cmd/agent/controller/bundledeployment_controller.go +++ b/internal/cmd/agent/controller/bundledeployment_controller.go @@ -85,15 +85,13 @@ func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// The Reconcile function compares the state specified by -// the BundleDeployment object against the actual cluster state, and then -// performs operations to make the cluster state reflect the state specified by -// the user. +// Reconcile compares the state specified by the BundleDeployment object +// against the actual state, and decides if the bundle should be deployed. +// The deployed resources are then monitored for drift. +// It also updates the status of the BundleDeployment object with the results. // // For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithName("bundledeployment") ctx = log.IntoContext(ctx, logger) @@ -149,7 +147,7 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req } if monitor.ShouldUpdateStatus(bd) { - // update the bundledeployment status and check if we deploy an agent, or if we need to trigger drift correction + // update the bundledeployment status and check if we deploy an agent status, err = r.Monitor.UpdateStatus(ctx, bd, resources) if err != nil { logger.Error(err, "Cannot monitor deployed bundle") @@ -162,17 +160,6 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req bd.Status = setCondition(status, nil, monitor.Cond(fleetv1.BundleDeploymentConditionMonitored)) } - // Run drift correction - if len(status.ModifiedStatus) > 0 && bd.Spec.CorrectDrift != nil && bd.Spec.CorrectDrift.Enabled { - if release, err := r.Deployer.RemoveExternalChanges(ctx, bd); err != nil { - merr = append(merr, fmt.Errorf("failed reconciling drift: %w", err)) - // Propagate drift correction error to bundle deployment status. - monitor.Cond(fleetv1.BundleDeploymentConditionReady).SetError(&status, "", err) - } else { - bd.Status.Release = release - } - } - if len(bd.Status.ModifiedStatus) > 0 && monitor.ShouldRedeployAgent(bd) { bd.Status.AppliedDeploymentID = "" if err := r.Cleanup.OldAgent(ctx, status.ModifiedStatus); err != nil { @@ -181,7 +168,7 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req } } - // update our mini controller, which watches deployed resources for drift + // update our driftdetect mini controller, which watches deployed resources for drift err = r.DriftDetect.Refresh(ctx, req.String(), bd, resources) if err != nil { logger.V(1).Error(err, "Failed to refresh drift detection", "step", "drift") diff --git a/internal/cmd/agent/controller/drift_controller.go b/internal/cmd/agent/controller/drift_controller.go new file mode 100644 index 0000000000..18f39f4165 --- /dev/null +++ b/internal/cmd/agent/controller/drift_controller.go @@ -0,0 +1,129 @@ +package controller + +import ( + "context" + "fmt" + + "github.com/rancher/fleet/internal/cmd/agent/deployer" + "github.com/rancher/fleet/internal/cmd/agent/deployer/driftdetect" + "github.com/rancher/fleet/internal/cmd/agent/deployer/monitor" + fleetv1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + + "github.com/rancher/wrangler/v3/pkg/condition" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + errutil "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type DriftReconciler struct { + client.Client + Scheme *runtime.Scheme + + Deployer *deployer.Deployer + Monitor *monitor.Monitor + DriftDetect *driftdetect.DriftDetect + + DriftChan chan event.GenericEvent +} + +// SetupWithManager sets up the controller with the Manager. +func (r *DriftReconciler) SetupWithManager(mgr ctrl.Manager) error { + src := source.Channel(r.DriftChan, &handler.EnqueueRequestForObject{}) + return ctrl.NewControllerManagedBy(mgr). + Named("drift-reconciler"). + WatchesRawSource(src). + Complete(r) + +} + +// Reconcile is triggered via a channel from the driftdetect mini controller, +// which watches deployed resources for drift. It does so by creating a plan +// and comparing it to the current state. +// It will update the status of the BundleDeployment and correct drift if enabled. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile +func (r *DriftReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithName("drift") + ctx = log.IntoContext(ctx, logger) + + // get latest BundleDeployment from cluster + bd := &fleetv1.BundleDeployment{} + err := r.Get(ctx, req.NamespacedName, bd) + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } else if err != nil { + return ctrl.Result{}, err + } + + if bd.Spec.Paused { + logger.V(1).Info("Bundle paused, clearing drift detection") + err := r.DriftDetect.Clear(req.String()) + + return ctrl.Result{}, err + } + + merr := []error{} + + // retrieve the resources from the helm history. + // if we can't retrieve the resources, we don't need to try any of the other operations and requeue now + resources, err := r.Deployer.Resources(bd.Name, bd.Status.Release) + if err != nil { + logger.V(1).Info("Failed to retrieve bundledeployment's resources") + return ctrl.Result{}, err + } + + // return early if the bundledeployment is still being installed + if !monitor.ShouldUpdateStatus(bd) { + return ctrl.Result{}, nil + } + + // update the bundledeployment status from the helm resource list + bd.Status, err = r.Monitor.UpdateStatus(ctx, bd, resources) + if err != nil { + logger.Error(err, "Cannot monitor deployed bundle") + } + + // run drift correction + if len(bd.Status.ModifiedStatus) > 0 && bd.Spec.CorrectDrift != nil && bd.Spec.CorrectDrift.Enabled { + if release, err := r.Deployer.RemoveExternalChanges(ctx, bd); err != nil { + merr = append(merr, fmt.Errorf("failed reconciling drift: %w", err)) + // Propagate drift correction error to bundle deployment status. + condition.Cond(fleetv1.BundleDeploymentConditionReady).SetError(&bd.Status, "", err) + } else { + bd.Status.Release = release + } + } + + // final status update + logger.V(1).Info("Reconcile finished, updating the bundledeployment status") + err = r.updateStatus(ctx, req.NamespacedName, bd.Status) + if apierrors.IsNotFound(err) { + merr = append(merr, fmt.Errorf("bundledeployment has been deleted: %w", err)) + } else if err != nil { + merr = append(merr, fmt.Errorf("failed final update to bundledeployment status: %w", err)) + } + + return ctrl.Result{}, errutil.NewAggregate(merr) +} + +func (r *DriftReconciler) updateStatus(ctx context.Context, req types.NamespacedName, status fleetv1.BundleDeploymentStatus) error { + return retry.RetryOnConflict(DefaultRetry, func() error { + newBD := &fleetv1.BundleDeployment{} + err := r.Get(ctx, req, newBD) + if err != nil { + return err + } + newBD.Status = status + return r.Status().Update(ctx, newBD) + }) +} diff --git a/internal/cmd/agent/deployer/driftdetect/driftdetect.go b/internal/cmd/agent/deployer/driftdetect/driftdetect.go index 36c05dec3a..7b336b752c 100644 --- a/internal/cmd/agent/deployer/driftdetect/driftdetect.go +++ b/internal/cmd/agent/deployer/driftdetect/driftdetect.go @@ -3,17 +3,13 @@ package driftdetect import ( "context" - "github.com/go-logr/logr" - "github.com/rancher/fleet/internal/cmd/agent/deployer/desiredset" "github.com/rancher/fleet/internal/cmd/agent/trigger" "github.com/rancher/fleet/internal/helmdeployer" fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/client-go/util/retry" - "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -21,32 +17,29 @@ type DriftDetect struct { // Trigger watches deployed resources on the local cluster. trigger *trigger.Trigger - upstreamClient client.Client - upstreamReader client.Reader - desiredset *desiredset.Client defaultNamespace string labelPrefix string labelSuffix string + + driftChan chan event.GenericEvent } func New( trigger *trigger.Trigger, - upstreamClient client.Client, - upstreamReader client.Reader, desiredset *desiredset.Client, defaultNamespace string, labelPrefix string, labelSuffix string, + driftChan chan event.GenericEvent, ) *DriftDetect { return &DriftDetect{ trigger: trigger, - upstreamClient: upstreamClient, - upstreamReader: upstreamReader, desiredset: desiredset, defaultNamespace: defaultNamespace, labelPrefix: labelPrefix, labelSuffix: labelSuffix, + driftChan: driftChan, } } @@ -56,7 +49,7 @@ func (d *DriftDetect) Clear(bdKey string) error { // Refresh triggers a sync of all resources of the provided bd which may have drifted from their desired state. func (d *DriftDetect) Refresh(ctx context.Context, bdKey string, bd *fleet.BundleDeployment, resources *helmdeployer.Resources) error { - logger := log.FromContext(ctx).WithName("drift-detect") + logger := log.FromContext(ctx).WithName("drift-detect").WithValues("initialResourceVersion", bd.ResourceVersion) logger.V(1).Info("Refreshing drift detection") resources, err := d.allResources(ctx, bd, resources) @@ -68,57 +61,14 @@ func (d *DriftDetect) Refresh(ctx context.Context, bdKey string, bd *fleet.Bundl return nil } - logger.V(1).Info("Adding OnChange for bundledeployment's resource list") - logger = logger.WithValues("key", bdKey, "initialResourceVersion", bd.ResourceVersion) - - handleID := int(bd.Generation) handler := func(key string) { - logger := logger.WithValues("handleID", handleID, "triggered by", key) - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Can't enqueue directly, update bundledeployment instead - return d.requeueBD(logger, handleID, bd.Namespace, bd.Name) - }) - if err != nil { - logger.Error(err, "Failed to trigger bundledeployment", "error", err) - return - } + logger.V(1).Info("Notifying driftdetect reconciler of a resource change", "triggeredBy", key) + d.driftChan <- event.GenericEvent{Object: bd} } - return d.trigger.OnChange(bdKey, resources.DefaultNamespace, handler, resources.Objects...) -} -func (d *DriftDetect) requeueBD(logger logr.Logger, handleID int, namespace string, name string) error { - bd := &fleet.BundleDeployment{} - - err := d.upstreamReader.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, bd) - if apierrors.IsNotFound(err) { - logger.Info("Bundledeployment is not found, can't trigger refresh") - return nil - } - if err != nil { - logger.Error(err, "Failed to get bundledeployment, can't trigger refresh") - return nil - } - - logger = logger.WithValues("resourceVersion", bd.ResourceVersion) - logger.V(1).Info("Going to update bundledeployment to trigger re-sync") - - // This mechanism of triggering requeues for changes is not ideal. - // It's a workaround since we can't enqueue directly from the trigger - // mini controller. Triggering via a status update is expensive. - // It's hard to compute a stable hash to make this idempotent, because - // the hash would need to be computed over the whole change. We can't - // just use the resource version of the bundle deployment. We would - // need to look at the deployed resources and compute a hash over them. - // However this status update happens for every changed resource, maybe - // multiple times per resource. It will also trigger on a resync. - bd.Status.SyncGeneration = &[]int64{int64(handleID)}[0] - - err = d.upstreamClient.Status().Update(context.Background(), bd) - if err != nil { - logger.V(1).Info("Retry to update bundledeployment, couldn't update status to trigger re-sync", "conflict", apierrors.IsConflict(err), "error", err) - } - return err + // Adding bundledeployment's resource list to the trigger-controller's watch list + return d.trigger.OnChange(bdKey, resources.DefaultNamespace, handler, resources.Objects...) } // allResources returns the resources that are deployed by the bundle deployment, diff --git a/internal/cmd/agent/deployer/normalizers/norm.go b/internal/cmd/agent/deployer/normalizers/norm.go index 54f67825b2..b6a9050825 100644 --- a/internal/cmd/agent/deployer/normalizers/norm.go +++ b/internal/cmd/agent/deployer/normalizers/norm.go @@ -12,8 +12,8 @@ type Norm struct { } func (n Norm) Normalize(un *unstructured.Unstructured) error { - for _, normalizers := range n.normalizers { - if err := normalizers.Normalize(un); err != nil { + for _, normalizer := range n.normalizers { + if err := normalizer.Normalize(un); err != nil { return err } } diff --git a/internal/cmd/agent/operator.go b/internal/cmd/agent/operator.go index c5019d93a5..2eccc335e6 100644 --- a/internal/cmd/agent/operator.go +++ b/internal/cmd/agent/operator.go @@ -28,6 +28,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -85,6 +86,8 @@ func start(ctx context.Context, localConfig *rest.Config, systemNamespace, agent localCtx, cancel := context.WithCancel(ctx) defer cancel() + driftChan := make(chan event.GenericEvent) + reconciler, err := newReconciler( ctx, localCtx, @@ -94,6 +97,7 @@ func start(ctx context.Context, localConfig *rest.Config, systemNamespace, agent fleetNamespace, agentScope, agentConfig, + driftChan, ) if err != nil { setupLog.Error(err, "unable to set up bundledeployment reconciler") @@ -107,6 +111,22 @@ func start(ctx context.Context, localConfig *rest.Config, systemNamespace, agent } //+kubebuilder:scaffold:builder + // RawSource watches for all events from the driftdetect mini controller + driftReconciler := &controller.DriftReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + + Deployer: reconciler.Deployer, + Monitor: reconciler.Monitor, + DriftDetect: reconciler.DriftDetect, + + DriftChan: driftChan, + } + if err = driftReconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "BundleDeployment") + return err + } + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") return err @@ -167,6 +187,7 @@ func newReconciler( fleetNamespace string, agentScope string, agentConfig config.Config, + driftChan chan event.GenericEvent, ) (*controller.BundleDeploymentReconciler, error) { upstreamClient := mgr.GetClient() @@ -228,12 +249,11 @@ func newReconciler( trigger := trigger.New(ctx, localDynamic, localCluster.GetRESTMapper()) driftdetect := driftdetect.New( trigger, - upstreamClient, - mgr.GetAPIReader(), ds, defaultNamespace, defaultNamespace, agentScope, + driftChan, ) // Build the clean up, which deletes helm releases diff --git a/pkg/durations/durations.go b/pkg/durations/durations.go index 5b8f0e9886..518bcee5ce 100644 --- a/pkg/durations/durations.go +++ b/pkg/durations/durations.go @@ -26,8 +26,8 @@ const ( RestConfigTimeout = time.Second * 15 ServiceTokenSleep = time.Second * 2 TokenClusterEnqueueDelay = time.Second * 2 - // TriggerSleep is the delay before the mini controller starts watching - // deployed resources for changes + // TriggerSleep is the delay before the driftdetect mini controller + // starts watching deployed resources for changes TriggerSleep = time.Second * 5 DefaultCpuPprofPeriod = time.Minute )