Skip to content

Commit

Permalink
Remove bd update workaround from drift detection (#2942)
Browse files Browse the repository at this point in the history
This adds a new reconciler, which reacts to a channel source, to remove
the bd update workaround. The driftdetect mini controller previously
signaled change via the bundledeployment's status to the
controller-runtime based reconciler.
The bundledeployment reconciler is now split into two, one
reconciler to install bundles and another to handle drift.
Both can update the status of the bundledeployment resource with the
current state of the deployment.
  • Loading branch information
manno authored Oct 18, 2024
1 parent 440751b commit 157d265
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 90 deletions.
27 changes: 23 additions & 4 deletions integrationtests/agent/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent_test
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -31,6 +32,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand Down Expand Up @@ -74,9 +76,11 @@ var _ = BeforeSuite(func() {
SetDefaultEventuallyTimeout(timeout)

ctx, cancel = context.WithCancel(context.TODO())
existing := os.Getenv("CI_USE_EXISTING_CLUSTER") == "true"
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "charts", "fleet-crd", "templates", "crds.yaml")},
ErrorIfCRDPathMissing: true,
UseExistingCluster: &existing,
}

var err error
Expand All @@ -103,12 +107,28 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

driftChan := make(chan event.GenericEvent)

// Set up the bundledeployment reconciler
Expect(k8sClient.Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: clusterNS}})).ToNot(HaveOccurred())
reconciler := newReconciler(ctx, k8sManager, newLookup(resources))
reconciler := newReconciler(ctx, k8sManager, newLookup(resources), driftChan)
err = reconciler.SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred(), "failed to set up manager")

// Set up the driftdetect reconciler
driftReconciler := &controller.DriftReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),

Deployer: reconciler.Deployer,
Monitor: reconciler.Monitor,
DriftDetect: reconciler.DriftDetect,

DriftChan: driftChan,
}
err = driftReconciler.SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred(), "failed to set up manager")

go func() {
defer GinkgoRecover()
err = k8sManager.Start(ctx)
Expand All @@ -124,7 +144,7 @@ var _ = AfterSuite(func() {
// newReconciler creates a new BundleDeploymentReconciler that will watch for changes
// in the test Fleet namespace, using configuration from the provided manager.
// Resources are provided by the lookup parameter.
func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup) *controller.BundleDeploymentReconciler {
func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup, driftChan chan event.GenericEvent) *controller.BundleDeploymentReconciler {
upstreamClient := mgr.GetClient()
// re-use client, since this is a single cluster test
localClient := upstreamClient
Expand Down Expand Up @@ -174,12 +194,11 @@ func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup) *co
trigger := trigger.New(ctx, localDynamic, mgr.GetRESTMapper())
driftdetect := driftdetect.New(
trigger,
upstreamClient,
mgr.GetAPIReader(),
dsClient,
defaultNamespace,
defaultNamespace,
agentScope,
driftChan,
)

// Build the clean up
Expand Down
27 changes: 7 additions & 20 deletions internal/cmd/agent/controller/bundledeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,13 @@ func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// The Reconcile function compares the state specified by
// the BundleDeployment object against the actual cluster state, and then
// performs operations to make the cluster state reflect the state specified by
// the user.
// Reconcile compares the state specified by the BundleDeployment object
// against the actual state, and decides if the bundle should be deployed.
// The deployed resources are then monitored for drift.
// It also updates the status of the BundleDeployment object with the results.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile
func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("bundledeployment")
ctx = log.IntoContext(ctx, logger)
Expand Down Expand Up @@ -149,7 +147,7 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
}

if monitor.ShouldUpdateStatus(bd) {
// update the bundledeployment status and check if we deploy an agent, or if we need to trigger drift correction
// update the bundledeployment status and check if we deploy an agent
status, err = r.Monitor.UpdateStatus(ctx, bd, resources)
if err != nil {
logger.Error(err, "Cannot monitor deployed bundle")
Expand All @@ -162,17 +160,6 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
bd.Status = setCondition(status, nil, monitor.Cond(fleetv1.BundleDeploymentConditionMonitored))
}

// Run drift correction
if len(status.ModifiedStatus) > 0 && bd.Spec.CorrectDrift != nil && bd.Spec.CorrectDrift.Enabled {
if release, err := r.Deployer.RemoveExternalChanges(ctx, bd); err != nil {
merr = append(merr, fmt.Errorf("failed reconciling drift: %w", err))
// Propagate drift correction error to bundle deployment status.
monitor.Cond(fleetv1.BundleDeploymentConditionReady).SetError(&status, "", err)
} else {
bd.Status.Release = release
}
}

if len(bd.Status.ModifiedStatus) > 0 && monitor.ShouldRedeployAgent(bd) {
bd.Status.AppliedDeploymentID = ""
if err := r.Cleanup.OldAgent(ctx, status.ModifiedStatus); err != nil {
Expand All @@ -181,7 +168,7 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
}

// update our mini controller, which watches deployed resources for drift
// update our driftdetect mini controller, which watches deployed resources for drift
err = r.DriftDetect.Refresh(ctx, req.String(), bd, resources)
if err != nil {
logger.V(1).Error(err, "Failed to refresh drift detection", "step", "drift")
Expand Down
129 changes: 129 additions & 0 deletions internal/cmd/agent/controller/drift_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package controller

import (
"context"
"fmt"

"github.com/rancher/fleet/internal/cmd/agent/deployer"
"github.com/rancher/fleet/internal/cmd/agent/deployer/driftdetect"
"github.com/rancher/fleet/internal/cmd/agent/deployer/monitor"
fleetv1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"

"github.com/rancher/wrangler/v3/pkg/condition"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
errutil "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
)

type DriftReconciler struct {
client.Client
Scheme *runtime.Scheme

Deployer *deployer.Deployer
Monitor *monitor.Monitor
DriftDetect *driftdetect.DriftDetect

DriftChan chan event.GenericEvent
}

// SetupWithManager sets up the controller with the Manager.
func (r *DriftReconciler) SetupWithManager(mgr ctrl.Manager) error {
src := source.Channel(r.DriftChan, &handler.EnqueueRequestForObject{})
return ctrl.NewControllerManagedBy(mgr).
Named("drift-reconciler").
WatchesRawSource(src).
Complete(r)

}

// Reconcile is triggered via a channel from the driftdetect mini controller,
// which watches deployed resources for drift. It does so by creating a plan
// and comparing it to the current state.
// It will update the status of the BundleDeployment and correct drift if enabled.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *DriftReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("drift")
ctx = log.IntoContext(ctx, logger)

// get latest BundleDeployment from cluster
bd := &fleetv1.BundleDeployment{}
err := r.Get(ctx, req.NamespacedName, bd)
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
} else if err != nil {
return ctrl.Result{}, err
}

if bd.Spec.Paused {
logger.V(1).Info("Bundle paused, clearing drift detection")
err := r.DriftDetect.Clear(req.String())

return ctrl.Result{}, err
}

merr := []error{}

// retrieve the resources from the helm history.
// if we can't retrieve the resources, we don't need to try any of the other operations and requeue now
resources, err := r.Deployer.Resources(bd.Name, bd.Status.Release)
if err != nil {
logger.V(1).Info("Failed to retrieve bundledeployment's resources")
return ctrl.Result{}, err
}

// return early if the bundledeployment is still being installed
if !monitor.ShouldUpdateStatus(bd) {
return ctrl.Result{}, nil
}

// update the bundledeployment status from the helm resource list
bd.Status, err = r.Monitor.UpdateStatus(ctx, bd, resources)
if err != nil {
logger.Error(err, "Cannot monitor deployed bundle")
}

// run drift correction
if len(bd.Status.ModifiedStatus) > 0 && bd.Spec.CorrectDrift != nil && bd.Spec.CorrectDrift.Enabled {
if release, err := r.Deployer.RemoveExternalChanges(ctx, bd); err != nil {
merr = append(merr, fmt.Errorf("failed reconciling drift: %w", err))
// Propagate drift correction error to bundle deployment status.
condition.Cond(fleetv1.BundleDeploymentConditionReady).SetError(&bd.Status, "", err)
} else {
bd.Status.Release = release
}
}

// final status update
logger.V(1).Info("Reconcile finished, updating the bundledeployment status")
err = r.updateStatus(ctx, req.NamespacedName, bd.Status)
if apierrors.IsNotFound(err) {
merr = append(merr, fmt.Errorf("bundledeployment has been deleted: %w", err))
} else if err != nil {
merr = append(merr, fmt.Errorf("failed final update to bundledeployment status: %w", err))
}

return ctrl.Result{}, errutil.NewAggregate(merr)
}

func (r *DriftReconciler) updateStatus(ctx context.Context, req types.NamespacedName, status fleetv1.BundleDeploymentStatus) error {
return retry.RetryOnConflict(DefaultRetry, func() error {
newBD := &fleetv1.BundleDeployment{}
err := r.Get(ctx, req, newBD)
if err != nil {
return err
}
newBD.Status = status
return r.Status().Update(ctx, newBD)
})
}
70 changes: 10 additions & 60 deletions internal/cmd/agent/deployer/driftdetect/driftdetect.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,43 @@ package driftdetect
import (
"context"

"github.com/go-logr/logr"

"github.com/rancher/fleet/internal/cmd/agent/deployer/desiredset"
"github.com/rancher/fleet/internal/cmd/agent/trigger"
"github.com/rancher/fleet/internal/helmdeployer"
fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type DriftDetect struct {
// Trigger watches deployed resources on the local cluster.
trigger *trigger.Trigger

upstreamClient client.Client
upstreamReader client.Reader

desiredset *desiredset.Client
defaultNamespace string
labelPrefix string
labelSuffix string

driftChan chan event.GenericEvent
}

func New(
trigger *trigger.Trigger,
upstreamClient client.Client,
upstreamReader client.Reader,
desiredset *desiredset.Client,
defaultNamespace string,
labelPrefix string,
labelSuffix string,
driftChan chan event.GenericEvent,
) *DriftDetect {
return &DriftDetect{
trigger: trigger,
upstreamClient: upstreamClient,
upstreamReader: upstreamReader,
desiredset: desiredset,
defaultNamespace: defaultNamespace,
labelPrefix: labelPrefix,
labelSuffix: labelSuffix,
driftChan: driftChan,
}
}

Expand All @@ -56,7 +49,7 @@ func (d *DriftDetect) Clear(bdKey string) error {

// Refresh triggers a sync of all resources of the provided bd which may have drifted from their desired state.
func (d *DriftDetect) Refresh(ctx context.Context, bdKey string, bd *fleet.BundleDeployment, resources *helmdeployer.Resources) error {
logger := log.FromContext(ctx).WithName("drift-detect")
logger := log.FromContext(ctx).WithName("drift-detect").WithValues("initialResourceVersion", bd.ResourceVersion)
logger.V(1).Info("Refreshing drift detection")

resources, err := d.allResources(ctx, bd, resources)
Expand All @@ -68,57 +61,14 @@ func (d *DriftDetect) Refresh(ctx context.Context, bdKey string, bd *fleet.Bundl
return nil
}

logger.V(1).Info("Adding OnChange for bundledeployment's resource list")
logger = logger.WithValues("key", bdKey, "initialResourceVersion", bd.ResourceVersion)

handleID := int(bd.Generation)
handler := func(key string) {
logger := logger.WithValues("handleID", handleID, "triggered by", key)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Can't enqueue directly, update bundledeployment instead
return d.requeueBD(logger, handleID, bd.Namespace, bd.Name)
})
if err != nil {
logger.Error(err, "Failed to trigger bundledeployment", "error", err)
return
}
logger.V(1).Info("Notifying driftdetect reconciler of a resource change", "triggeredBy", key)
d.driftChan <- event.GenericEvent{Object: bd}

}
return d.trigger.OnChange(bdKey, resources.DefaultNamespace, handler, resources.Objects...)
}

func (d *DriftDetect) requeueBD(logger logr.Logger, handleID int, namespace string, name string) error {
bd := &fleet.BundleDeployment{}

err := d.upstreamReader.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, bd)
if apierrors.IsNotFound(err) {
logger.Info("Bundledeployment is not found, can't trigger refresh")
return nil
}
if err != nil {
logger.Error(err, "Failed to get bundledeployment, can't trigger refresh")
return nil
}

logger = logger.WithValues("resourceVersion", bd.ResourceVersion)
logger.V(1).Info("Going to update bundledeployment to trigger re-sync")

// This mechanism of triggering requeues for changes is not ideal.
// It's a workaround since we can't enqueue directly from the trigger
// mini controller. Triggering via a status update is expensive.
// It's hard to compute a stable hash to make this idempotent, because
// the hash would need to be computed over the whole change. We can't
// just use the resource version of the bundle deployment. We would
// need to look at the deployed resources and compute a hash over them.
// However this status update happens for every changed resource, maybe
// multiple times per resource. It will also trigger on a resync.
bd.Status.SyncGeneration = &[]int64{int64(handleID)}[0]

err = d.upstreamClient.Status().Update(context.Background(), bd)
if err != nil {
logger.V(1).Info("Retry to update bundledeployment, couldn't update status to trigger re-sync", "conflict", apierrors.IsConflict(err), "error", err)
}
return err
// Adding bundledeployment's resource list to the trigger-controller's watch list
return d.trigger.OnChange(bdKey, resources.DefaultNamespace, handler, resources.Objects...)
}

// allResources returns the resources that are deployed by the bundle deployment,
Expand Down
Loading

0 comments on commit 157d265

Please sign in to comment.