Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor reconcilers and remove foldin func #2553

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/bundlereader/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sigs.k8s.io/yaml"
)

// Options include the GitRepo overrides, which are passed via command line args
type Options struct {
Compress bool
Labels map[string]string
Expand Down
56 changes: 28 additions & 28 deletions internal/cmd/agent/controller/bundledeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,34 @@ var DefaultRetry = wait.Backoff{
Jitter: 0.1,
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleetv1.BundleDeployment{}).
WithEventFilter(
// we do not trigger for status changes
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
predicate.Funcs{
// except for changes to status.Refresh
UpdateFunc: func(e event.UpdateEvent) bool {
n := e.ObjectNew.(*fleetv1.BundleDeployment)
o := e.ObjectOld.(*fleetv1.BundleDeployment)
if n == nil || o == nil {
return false
}
return n.Status.SyncGeneration != o.Status.SyncGeneration
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
},
)).
Complete(r)
}

//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update
Expand Down Expand Up @@ -186,34 +214,6 @@ func (r *BundleDeploymentReconciler) updateStatus(ctx context.Context, req types
})
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleetv1.BundleDeployment{}).
WithEventFilter(
// we do not trigger for status changes
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
predicate.Funcs{
// except for changes to status.Refresh
UpdateFunc: func(e event.UpdateEvent) bool {
n := e.ObjectNew.(*fleetv1.BundleDeployment)
o := e.ObjectOld.(*fleetv1.BundleDeployment)
if n == nil || o == nil {
return false
}
return n.Status.SyncGeneration != o.Status.SyncGeneration
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
},
)).
Complete(r)
}

// setCondition sets the condition and updates the timestamp, if the condition changed
func setCondition(newStatus fleetv1.BundleDeploymentStatus, err error, cond condition.Cond) fleetv1.BundleDeploymentStatus {
cond.SetError(&newStatus, "", ignoreConflict(err))
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/controller/options/calculate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package options merges the BundleDeploymentOptions
// Package options merges the BundleDeploymentOptions, so that targetCustomizations take effect.
package options

import (
Expand Down
114 changes: 57 additions & 57 deletions internal/cmd/controller/reconciler/bundle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,63 @@ type BundleReconciler struct {
Workers int
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.Bundle{}).
// Note: Maybe improve with WatchesMetadata, does it have access to labels?
Watches(
// Fan out from bundledeployment to bundle
&fleet.BundleDeployment{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request {
bd := a.(*fleet.BundleDeployment)
labels := bd.GetLabels()
if labels == nil {
return nil
}

ns, name := target.BundleFromDeployment(labels)
if ns != "" && name != "" {
return []ctrl.Request{{
NamespacedName: types.NamespacedName{
Namespace: ns,
Name: name,
},
}}
}

return nil
}),
builder.WithPredicates(bundleDeploymentStatusChangedPredicate()),
).
Watches(
// Fan out from cluster to bundle
&fleet.Cluster{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request {
cluster := a.(*fleet.Cluster)
bundlesToRefresh, _, err := r.Query.BundlesForCluster(ctx, cluster)
if err != nil {
return nil
}
requests := []ctrl.Request{}
for _, bundle := range bundlesToRefresh {
requests = append(requests, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: bundle.Namespace,
Name: bundle.Name,
},
})
}

return requests
}),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}

//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles/finalizers,verbs=update
Expand Down Expand Up @@ -238,60 +295,3 @@ func upper(op controllerutil.OperationResult) string {
return "Unknown"
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.Bundle{}).
// Note: Maybe improve with WatchesMetadata, does it have access to labels?
Watches(
// Fan out from bundledeployment to bundle
&fleet.BundleDeployment{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request {
bd := a.(*fleet.BundleDeployment)
labels := bd.GetLabels()
if labels == nil {
return nil
}

ns, name := target.BundleFromDeployment(labels)
if ns != "" && name != "" {
return []ctrl.Request{{
NamespacedName: types.NamespacedName{
Namespace: ns,
Name: name,
},
}}
}

return nil
}),
builder.WithPredicates(bundleDeploymentStatusChangedPredicate()),
).
Watches(
// Fan out from cluster to bundle
&fleet.Cluster{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request {
cluster := a.(*fleet.Cluster)
bundlesToRefresh, _, err := r.Query.BundlesForCluster(ctx, cluster)
if err != nil {
return nil
}
requests := []ctrl.Request{}
for _, bundle := range bundlesToRefresh {
requests = append(requests, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: bundle.Namespace,
Name: bundle.Name,
},
})
}

return requests
}),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}
22 changes: 11 additions & 11 deletions internal/cmd/controller/reconciler/bundledeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ type BundleDeploymentReconciler struct {
Workers int
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.BundleDeployment{}, builder.WithPredicates(
bundleDeploymentStatusChangedPredicate(),
)).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}

//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update
Expand Down Expand Up @@ -133,17 +144,6 @@ func bundleDeploymentStatusChangedPredicate() predicate.Funcs {
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.BundleDeployment{}, builder.WithPredicates(
bundleDeploymentStatusChangedPredicate(),
)).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}).
Complete(r)
}

func conditionToMessage(cond genericcondition.GenericCondition) string {
if cond.Reason == "Error" {
return "Error: " + cond.Message
Expand Down
88 changes: 44 additions & 44 deletions internal/cmd/controller/reconciler/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,50 @@ type ClusterReconciler struct {
ShardID string
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.Cluster{}).
// Watch bundledeployments so we can update the status fields
Watches(
&fleet.BundleDeployment{},
handler.EnqueueRequestsFromMapFunc(r.mapBundleDeploymentToCluster),
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
// Triggering on every update would run into an
// endless loop with the agentmanagement
// cluster controller.
// We still need to update often enough to keep the
// status fields up to date.
UpdateFunc: func(e event.UpdateEvent) bool {
n := e.ObjectNew.(*fleet.BundleDeployment)
o := e.ObjectOld.(*fleet.BundleDeployment)
if n == nil || o == nil {
return false
}
if !reflect.DeepEqual(n.Spec, o.Spec) {
return true
}
if n.Status.AppliedDeploymentID != o.Status.AppliedDeploymentID {
return true
}
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
o := e.Object.(*fleet.BundleDeployment)
if o == nil || o.Status.AppliedDeploymentID == "" {
return false
}
return true
},
}),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
Complete(r)
}

//+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters/finalizers,verbs=update
Expand Down Expand Up @@ -232,50 +276,6 @@ func (r *ClusterReconciler) updateStatus(ctx context.Context, req types.Namespac
})
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&fleet.Cluster{}).
// Watch bundledeployments so we can update the status fields
Watches(
&fleet.BundleDeployment{},
handler.EnqueueRequestsFromMapFunc(r.mapBundleDeploymentToCluster),
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
// Triggering on every update would run into an
// endless loop with the agentmanagement
// cluster controller.
// We still need to update often enough to keep the
// status fields up to date.
UpdateFunc: func(e event.UpdateEvent) bool {
n := e.ObjectNew.(*fleet.BundleDeployment)
o := e.ObjectOld.(*fleet.BundleDeployment)
if n == nil || o == nil {
return false
}
if !reflect.DeepEqual(n.Spec, o.Spec) {
return true
}
if n.Status.AppliedDeploymentID != o.Status.AppliedDeploymentID {
return true
}
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
o := e.Object.(*fleet.BundleDeployment)
if o == nil || o.Status.AppliedDeploymentID == "" {
return false
}
return true
},
}),
).
WithEventFilter(sharding.FilterByShardID(r.ShardID)).
Complete(r)
}

func (r *ClusterReconciler) mapBundleDeploymentToCluster(ctx context.Context, a client.Object) []ctrl.Request {
clusterNS := &corev1.Namespace{}
err := r.Get(ctx, types.NamespacedName{Name: a.GetNamespace()}, clusterNS)
Expand Down
Loading
Loading