Skip to content

Commit

Permalink
Merge pull request #2553 from rancher/refactor-reconcilers-and-remove…
Browse files Browse the repository at this point in the history
…-foldin-func

Refactor reconcilers and remove foldin func
  • Loading branch information
manno authored Jun 26, 2024
2 parents f9450f6 + fc16ec1 commit c220a0d
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 232 deletions.
1 change: 1 addition & 0 deletions internal/bundlereader/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sigs.k8s.io/yaml"
)

// Options include the GitRepo overrides, which are passed via command line args
type Options struct {
Compress bool
Labels map[string]string
Expand Down
56 changes: 28 additions & 28 deletions internal/cmd/agent/controller/bundledeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,34 @@ var DefaultRetry = wait.Backoff{
Jitter: 0.1,
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleetv1.BundleDeployment{}).
WithEventFilter(
// we do not trigger for status changes
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
predicate.Funcs{
// except for changes to status.Refresh
UpdateFunc: func(e event.UpdateEvent) bool {
n := e.ObjectNew.(*fleetv1.BundleDeployment)
o := e.ObjectOld.(*fleetv1.BundleDeployment)
if n == nil || o == nil {
return false
}
return n.Status.SyncGeneration != o.Status.SyncGeneration
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
},
)).
Complete(r)
}

//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update
Expand Down Expand Up @@ -186,34 +214,6 @@ func (r *BundleDeploymentReconciler) updateStatus(ctx context.Context, req types
})
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleetv1.BundleDeployment{}).
WithEventFilter(
// we do not trigger for status changes
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
predicate.Funcs{
// except for changes to status.Refresh
UpdateFunc: func(e event.UpdateEvent) bool {
n := e.ObjectNew.(*fleetv1.BundleDeployment)
o := e.ObjectOld.(*fleetv1.BundleDeployment)
if n == nil || o == nil {
return false
}
return n.Status.SyncGeneration != o.Status.SyncGeneration
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
},
)).
Complete(r)
}

// setCondition sets the condition and updates the timestamp, if the condition changed
func setCondition(newStatus fleetv1.BundleDeploymentStatus, err error, cond condition.Cond) fleetv1.BundleDeploymentStatus {
cond.SetError(&newStatus, "", ignoreConflict(err))
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/controller/options/calculate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package options merges the BundleDeploymentOptions
// Package options merges the BundleDeploymentOptions, so that targetCustomizations take effect.
package options

import (
Expand Down
114 changes: 57 additions & 57 deletions internal/cmd/controller/reconciler/bundle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,63 @@ type BundleReconciler struct {
Workers int
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.Bundle{}).
// Note: Maybe improve with WatchesMetadata, does it have access to labels?
Watches(
// Fan out from bundledeployment to bundle
&fleet.BundleDeployment{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request {
bd := a.(*fleet.BundleDeployment)
labels := bd.GetLabels()
if labels == nil {
return nil
}

ns, name := target.BundleFromDeployment(labels)
if ns != "" && name != "" {
return []ctrl.Request{{
NamespacedName: types.NamespacedName{
Namespace: ns,
Name: name,
},
}}
}

return nil
}),
builder.WithPredicates(bundleDeploymentStatusChangedPredicate()),
).
Watches(
// Fan out from cluster to bundle
&fleet.Cluster{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request {
cluster := a.(*fleet.Cluster)
bundlesToRefresh, _, err := r.Query.BundlesForCluster(ctx, cluster)
if err != nil {
return nil
}
requests := []ctrl.Request{}
for _, bundle := range bundlesToRefresh {
requests = append(requests, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: bundle.Namespace,
Name: bundle.Name,
},
})
}

return requests
}),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}

//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles/finalizers,verbs=update
Expand Down Expand Up @@ -238,60 +295,3 @@ func upper(op controllerutil.OperationResult) string {
return "Unknown"
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.Bundle{}).
// Note: Maybe improve with WatchesMetadata, does it have access to labels?
Watches(
// Fan out from bundledeployment to bundle
&fleet.BundleDeployment{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request {
bd := a.(*fleet.BundleDeployment)
labels := bd.GetLabels()
if labels == nil {
return nil
}

ns, name := target.BundleFromDeployment(labels)
if ns != "" && name != "" {
return []ctrl.Request{{
NamespacedName: types.NamespacedName{
Namespace: ns,
Name: name,
},
}}
}

return nil
}),
builder.WithPredicates(bundleDeploymentStatusChangedPredicate()),
).
Watches(
// Fan out from cluster to bundle
&fleet.Cluster{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request {
cluster := a.(*fleet.Cluster)
bundlesToRefresh, _, err := r.Query.BundlesForCluster(ctx, cluster)
if err != nil {
return nil
}
requests := []ctrl.Request{}
for _, bundle := range bundlesToRefresh {
requests = append(requests, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: bundle.Namespace,
Name: bundle.Name,
},
})
}

return requests
}),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}
22 changes: 11 additions & 11 deletions internal/cmd/controller/reconciler/bundledeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ type BundleDeploymentReconciler struct {
Workers int
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.BundleDeployment{}, builder.WithPredicates(
bundleDeploymentStatusChangedPredicate(),
)).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}

//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update
Expand Down Expand Up @@ -133,17 +144,6 @@ func bundleDeploymentStatusChangedPredicate() predicate.Funcs {
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.BundleDeployment{}, builder.WithPredicates(
bundleDeploymentStatusChangedPredicate(),
)).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}

func conditionToMessage(cond genericcondition.GenericCondition) string {
if cond.Reason == "Error" {
return "Error: " + cond.Message
Expand Down
88 changes: 44 additions & 44 deletions internal/cmd/controller/reconciler/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,50 @@ type ClusterReconciler struct {
ShardID string
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.Cluster{}).
// Watch bundledeployments so we can update the status fields
Watches(
&fleet.BundleDeployment{},
handler.EnqueueRequestsFromMapFunc(r.mapBundleDeploymentToCluster),
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
// Triggering on every update would run into an
// endless loop with the agentmanagement
// cluster controller.
// We still need to update often enough to keep the
// status fields up to date.
UpdateFunc: func(e event.UpdateEvent) bool {
n := e.ObjectNew.(*fleet.BundleDeployment)
o := e.ObjectOld.(*fleet.BundleDeployment)
if n == nil || o == nil {
return false
}
if !reflect.DeepEqual(n.Spec, o.Spec) {
return true
}
if n.Status.AppliedDeploymentID != o.Status.AppliedDeploymentID {
return true
}
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
o := e.Object.(*fleet.BundleDeployment)
if o == nil || o.Status.AppliedDeploymentID == "" {
return false
}
return true
},
}),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
Complete(r)
}

//+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters/finalizers,verbs=update
Expand Down Expand Up @@ -232,50 +276,6 @@ func (r *ClusterReconciler) updateStatus(ctx context.Context, req types.Namespac
})
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.Cluster{}).
// Watch bundledeployments so we can update the status fields
Watches(
&fleet.BundleDeployment{},
handler.EnqueueRequestsFromMapFunc(r.mapBundleDeploymentToCluster),
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
// Triggering on every update would run into an
// endless loop with the agentmanagement
// cluster controller.
// We still need to update often enough to keep the
// status fields up to date.
UpdateFunc: func(e event.UpdateEvent) bool {
n := e.ObjectNew.(*fleet.BundleDeployment)
o := e.ObjectOld.(*fleet.BundleDeployment)
if n == nil || o == nil {
return false
}
if !reflect.DeepEqual(n.Spec, o.Spec) {
return true
}
if n.Status.AppliedDeploymentID != o.Status.AppliedDeploymentID {
return true
}
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
o := e.Object.(*fleet.BundleDeployment)
if o == nil || o.Status.AppliedDeploymentID == "" {
return false
}
return true
},
}),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
Complete(r)
}

func (r *ClusterReconciler) mapBundleDeploymentToCluster(ctx context.Context, a client.Object) []ctrl.Request {
clusterNS := &corev1.Namespace{}
err := r.Get(ctx, types.NamespacedName{Name: a.GetNamespace()}, clusterNS)
Expand Down
Loading

0 comments on commit c220a0d

Please sign in to comment.