diff --git a/internal/bundlereader/read.go b/internal/bundlereader/read.go index 4aaa2550d5..f5f1f09d54 100644 --- a/internal/bundlereader/read.go +++ b/internal/bundlereader/read.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/yaml" ) +// Options include the GitRepo overrides, which are passed via command line args type Options struct { Compress bool Labels map[string]string diff --git a/internal/cmd/agent/controller/bundledeployment_controller.go b/internal/cmd/agent/controller/bundledeployment_controller.go index 70c6448231..9821eef305 100644 --- a/internal/cmd/agent/controller/bundledeployment_controller.go +++ b/internal/cmd/agent/controller/bundledeployment_controller.go @@ -53,6 +53,34 @@ var DefaultRetry = wait.Backoff{ Jitter: 0.1, } +// SetupWithManager sets up the controller with the Manager. +func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&fleetv1.BundleDeployment{}). + WithEventFilter( + // we do not trigger for status changes + predicate.Or( + predicate.GenerationChangedPredicate{}, + predicate.AnnotationChangedPredicate{}, + predicate.LabelChangedPredicate{}, + predicate.Funcs{ + // except for changes to status.Refresh + UpdateFunc: func(e event.UpdateEvent) bool { + n := e.ObjectNew.(*fleetv1.BundleDeployment) + o := e.ObjectOld.(*fleetv1.BundleDeployment) + if n == nil || o == nil { + return false + } + return n.Status.SyncGeneration != o.Status.SyncGeneration + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + }, + )). + Complete(r) +} + //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update @@ -186,34 +214,6 @@ func (r *BundleDeploymentReconciler) updateStatus(ctx context.Context, req types }) } -// SetupWithManager sets up the controller with the Manager. -func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&fleetv1.BundleDeployment{}). - WithEventFilter( - // we do not trigger for status changes - predicate.Or( - predicate.GenerationChangedPredicate{}, - predicate.AnnotationChangedPredicate{}, - predicate.LabelChangedPredicate{}, - predicate.Funcs{ - // except for changes to status.Refresh - UpdateFunc: func(e event.UpdateEvent) bool { - n := e.ObjectNew.(*fleetv1.BundleDeployment) - o := e.ObjectOld.(*fleetv1.BundleDeployment) - if n == nil || o == nil { - return false - } - return n.Status.SyncGeneration != o.Status.SyncGeneration - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return true - }, - }, - )). - Complete(r) -} - // setCondition sets the condition and updates the timestamp, if the condition changed func setCondition(newStatus fleetv1.BundleDeploymentStatus, err error, cond condition.Cond) fleetv1.BundleDeploymentStatus { cond.SetError(&newStatus, "", ignoreConflict(err)) diff --git a/internal/cmd/controller/options/calculate.go b/internal/cmd/controller/options/calculate.go index eae898a5c2..87927db53d 100644 --- a/internal/cmd/controller/options/calculate.go +++ b/internal/cmd/controller/options/calculate.go @@ -1,4 +1,4 @@ -// Package options merges the BundleDeploymentOptions +// Package options merges the BundleDeploymentOptions, so that targetCustomizations take effect. package options import ( diff --git a/internal/cmd/controller/reconciler/bundle_controller.go b/internal/cmd/controller/reconciler/bundle_controller.go index c9bd83764c..9882ef8bd4 100644 --- a/internal/cmd/controller/reconciler/bundle_controller.go +++ b/internal/cmd/controller/reconciler/bundle_controller.go @@ -55,6 +55,63 @@ type BundleReconciler struct { Workers int } +// SetupWithManager sets up the controller with the Manager. +func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&fleet.Bundle{}). + // Note: Maybe improve with WatchesMetadata, does it have access to labels? + Watches( + // Fan out from bundledeployment to bundle + &fleet.BundleDeployment{}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request { + bd := a.(*fleet.BundleDeployment) + labels := bd.GetLabels() + if labels == nil { + return nil + } + + ns, name := target.BundleFromDeployment(labels) + if ns != "" && name != "" { + return []ctrl.Request{{ + NamespacedName: types.NamespacedName{ + Namespace: ns, + Name: name, + }, + }} + } + + return nil + }), + builder.WithPredicates(bundleDeploymentStatusChangedPredicate()), + ). + Watches( + // Fan out from cluster to bundle + &fleet.Cluster{}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request { + cluster := a.(*fleet.Cluster) + bundlesToRefresh, _, err := r.Query.BundlesForCluster(ctx, cluster) + if err != nil { + return nil + } + requests := []ctrl.Request{} + for _, bundle := range bundlesToRefresh { + requests = append(requests, ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: bundle.Namespace, + Name: bundle.Name, + }, + }) + } + + return requests + }), + builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), + ). + WithEventFilter(sharding.FilterByShardID(r.ShardID)). + WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}). + Complete(r) +} + //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles/status,verbs=get;update;patch //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundles/finalizers,verbs=update @@ -238,60 +295,3 @@ func upper(op controllerutil.OperationResult) string { return "Unknown" } } - -// SetupWithManager sets up the controller with the Manager. -func (r *BundleReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&fleet.Bundle{}). - // Note: Maybe improve with WatchesMetadata, does it have access to labels? - Watches( - // Fan out from bundledeployment to bundle - &fleet.BundleDeployment{}, - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request { - bd := a.(*fleet.BundleDeployment) - labels := bd.GetLabels() - if labels == nil { - return nil - } - - ns, name := target.BundleFromDeployment(labels) - if ns != "" && name != "" { - return []ctrl.Request{{ - NamespacedName: types.NamespacedName{ - Namespace: ns, - Name: name, - }, - }} - } - - return nil - }), - builder.WithPredicates(bundleDeploymentStatusChangedPredicate()), - ). - Watches( - // Fan out from cluster to bundle - &fleet.Cluster{}, - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []ctrl.Request { - cluster := a.(*fleet.Cluster) - bundlesToRefresh, _, err := r.Query.BundlesForCluster(ctx, cluster) - if err != nil { - return nil - } - requests := []ctrl.Request{} - for _, bundle := range bundlesToRefresh { - requests = append(requests, ctrl.Request{ - NamespacedName: types.NamespacedName{ - Namespace: bundle.Namespace, - Name: bundle.Name, - }, - }) - } - - return requests - }), - builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), - ). - WithEventFilter(sharding.FilterByShardID(r.ShardID)). - WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}). - Complete(r) -} diff --git a/internal/cmd/controller/reconciler/bundledeployment_controller.go b/internal/cmd/controller/reconciler/bundledeployment_controller.go index af4adbc6f2..d5bd14cbb0 100644 --- a/internal/cmd/controller/reconciler/bundledeployment_controller.go +++ b/internal/cmd/controller/reconciler/bundledeployment_controller.go @@ -35,6 +35,17 @@ type BundleDeploymentReconciler struct { Workers int } +// SetupWithManager sets up the controller with the Manager. +func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&fleet.BundleDeployment{}, builder.WithPredicates( + bundleDeploymentStatusChangedPredicate(), + )). + WithEventFilter(sharding.FilterByShardID(r.ShardID)). + WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}). + Complete(r) +} + //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update @@ -133,17 +144,6 @@ func bundleDeploymentStatusChangedPredicate() predicate.Funcs { } } -// SetupWithManager sets up the controller with the Manager. -func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&fleet.BundleDeployment{}, builder.WithPredicates( - bundleDeploymentStatusChangedPredicate(), - )). - WithEventFilter(sharding.FilterByShardID(r.ShardID)). - WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}). - Complete(r) -} - func conditionToMessage(cond genericcondition.GenericCondition) string { if cond.Reason == "Error" { return "Error: " + cond.Message diff --git a/internal/cmd/controller/reconciler/cluster_controller.go b/internal/cmd/controller/reconciler/cluster_controller.go index ca55bc34da..f2c864dba5 100644 --- a/internal/cmd/controller/reconciler/cluster_controller.go +++ b/internal/cmd/controller/reconciler/cluster_controller.go @@ -56,6 +56,50 @@ type ClusterReconciler struct { ShardID string } +// SetupWithManager sets up the controller with the Manager. +func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&fleet.Cluster{}). + // Watch bundledeployments so we can update the status fields + Watches( + &fleet.BundleDeployment{}, + handler.EnqueueRequestsFromMapFunc(r.mapBundleDeploymentToCluster), + builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + // Triggering on every update would run into an + // endless loop with the agentmanagement + // cluster controller. + // We still need to update often enough to keep the + // status fields up to date. + UpdateFunc: func(e event.UpdateEvent) bool { + n := e.ObjectNew.(*fleet.BundleDeployment) + o := e.ObjectOld.(*fleet.BundleDeployment) + if n == nil || o == nil { + return false + } + if !reflect.DeepEqual(n.Spec, o.Spec) { + return true + } + if n.Status.AppliedDeploymentID != o.Status.AppliedDeploymentID { + return true + } + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + o := e.Object.(*fleet.BundleDeployment) + if o == nil || o.Status.AppliedDeploymentID == "" { + return false + } + return true + }, + }), + ). + WithEventFilter(sharding.FilterByShardID(r.ShardID)). + Complete(r) +} + //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters/status,verbs=get;update;patch //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters/finalizers,verbs=update @@ -232,50 +276,6 @@ func (r *ClusterReconciler) updateStatus(ctx context.Context, req types.Namespac }) } -// SetupWithManager sets up the controller with the Manager. -func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&fleet.Cluster{}). - // Watch bundledeployments so we can update the status fields - Watches( - &fleet.BundleDeployment{}, - handler.EnqueueRequestsFromMapFunc(r.mapBundleDeploymentToCluster), - builder.WithPredicates(predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return true - }, - // Triggering on every update would run into an - // endless loop with the agentmanagement - // cluster controller. - // We still need to update often enough to keep the - // status fields up to date. - UpdateFunc: func(e event.UpdateEvent) bool { - n := e.ObjectNew.(*fleet.BundleDeployment) - o := e.ObjectOld.(*fleet.BundleDeployment) - if n == nil || o == nil { - return false - } - if !reflect.DeepEqual(n.Spec, o.Spec) { - return true - } - if n.Status.AppliedDeploymentID != o.Status.AppliedDeploymentID { - return true - } - return false - }, - DeleteFunc: func(e event.DeleteEvent) bool { - o := e.Object.(*fleet.BundleDeployment) - if o == nil || o.Status.AppliedDeploymentID == "" { - return false - } - return true - }, - }), - ). - WithEventFilter(sharding.FilterByShardID(r.ShardID)). - Complete(r) -} - func (r *ClusterReconciler) mapBundleDeploymentToCluster(ctx context.Context, a client.Object) []ctrl.Request { clusterNS := &corev1.Namespace{} err := r.Get(ctx, types.NamespacedName{Name: a.GetNamespace()}, clusterNS) diff --git a/internal/cmd/controller/reconciler/clustergroup_controller.go b/internal/cmd/controller/reconciler/clustergroup_controller.go index 93f81d691b..b443e4bc7b 100644 --- a/internal/cmd/controller/reconciler/clustergroup_controller.go +++ b/internal/cmd/controller/reconciler/clustergroup_controller.go @@ -42,6 +42,34 @@ type ClusterGroupReconciler struct { const MaxReportedNonReadyClusters = 10 +// SetupWithManager sets up the controller with the Manager. +func (r *ClusterGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&fleet.ClusterGroup{}, builder.WithPredicates( + // only trigger on status changes, create + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + n := e.ObjectNew.(*fleet.ClusterGroup) + o := e.ObjectOld.(*fleet.ClusterGroup) + if n == nil || o == nil { + return false + } + return !reflect.DeepEqual(n.Status, o.Status) + }, + }, + )). + Watches( + // Fan out from cluster to clustergroup + &fleet.Cluster{}, + handler.EnqueueRequestsFromMapFunc(r.mapClusterToClusterGroup), + ). + WithEventFilter(sharding.FilterByShardID(r.ShardID)). + Complete(r) +} + //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clustergroups,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clustergroups/status,verbs=get;update;patch //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clustergroups/finalizers,verbs=update @@ -157,34 +185,6 @@ func (r *ClusterGroupReconciler) updateStatus(ctx context.Context, req types.Nam }) } -// SetupWithManager sets up the controller with the Manager. -func (r *ClusterGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&fleet.ClusterGroup{}, builder.WithPredicates( - // only trigger on status changes, create - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return true - }, - UpdateFunc: func(e event.UpdateEvent) bool { - n := e.ObjectNew.(*fleet.ClusterGroup) - o := e.ObjectOld.(*fleet.ClusterGroup) - if n == nil || o == nil { - return false - } - return !reflect.DeepEqual(n.Status, o.Status) - }, - }, - )). - Watches( - // Fan out from cluster to clustergroup - &fleet.Cluster{}, - handler.EnqueueRequestsFromMapFunc(r.mapClusterToClusterGroup), - ). - WithEventFilter(sharding.FilterByShardID(r.ShardID)). - Complete(r) -} - func (r *ClusterGroupReconciler) mapClusterToClusterGroup(ctx context.Context, a client.Object) []ctrl.Request { ns := a.GetNamespace() logger := log.FromContext(ctx).WithName("clustergroup-cluster-handler").WithValues("namespace", ns) diff --git a/internal/cmd/controller/reconciler/config_controller.go b/internal/cmd/controller/reconciler/config_controller.go index bf03830b8a..84bb779adc 100644 --- a/internal/cmd/controller/reconciler/config_controller.go +++ b/internal/cmd/controller/reconciler/config_controller.go @@ -44,6 +44,27 @@ func Load(ctx context.Context, c client.Reader, namespace string) error { return nil } +// SetupWithManager sets up the controller with the Manager. +func (r *ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + // TODO Maybe we can limit this Watch to the system namespace? + For(&corev1.ConfigMap{}). + WithEventFilter( + // we do not trigger for status changes + predicate.And( + sharding.FilterByShardID(r.ShardID), + predicate.NewPredicateFuncs(func(object client.Object) bool { + return object.GetNamespace() == r.SystemNamespace && + object.GetName() == config.ManagerConfigName + }), + predicate.Or( + predicate.GenerationChangedPredicate{}, + predicate.AnnotationChangedPredicate{}, + predicate.LabelChangedPredicate{}, + ))). + Complete(r) +} + // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *ConfigReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) { @@ -66,24 +87,3 @@ func (r *ConfigReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl. return ctrl.Result{}, nil } - -// SetupWithManager sets up the controller with the Manager. -func (r *ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - // TODO Maybe we can limit this Watch to the system namespace? - For(&corev1.ConfigMap{}). - WithEventFilter( - // we do not trigger for status changes - predicate.And( - sharding.FilterByShardID(r.ShardID), - predicate.NewPredicateFuncs(func(object client.Object) bool { - return object.GetNamespace() == r.SystemNamespace && - object.GetName() == config.ManagerConfigName - }), - predicate.Or( - predicate.GenerationChangedPredicate{}, - predicate.AnnotationChangedPredicate{}, - predicate.LabelChangedPredicate{}, - ))). - Complete(r) -} diff --git a/internal/cmd/controller/reconciler/imagescan_controller.go b/internal/cmd/controller/reconciler/imagescan_controller.go index a010e534c3..cbd48336bf 100644 --- a/internal/cmd/controller/reconciler/imagescan_controller.go +++ b/internal/cmd/controller/reconciler/imagescan_controller.go @@ -28,6 +28,23 @@ type ImageScanReconciler struct { ShardID string } +// SetupWithManager sets up the controller with the Manager. +func (r *ImageScanReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&fleet.ImageScan{}). + WithEventFilter( + // we do not trigger for status changes + predicate.And( + sharding.FilterByShardID(r.ShardID), + predicate.Or( + predicate.GenerationChangedPredicate{}, + predicate.AnnotationChangedPredicate{}, + predicate.LabelChangedPredicate{}, + ), + )). + Complete(r) +} + //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters/status,verbs=get;update;patch //+kubebuilder:rbac:groups=fleet.cattle.io,resources=clusters/finalizers,verbs=update @@ -95,20 +112,3 @@ func (r *ImageScanReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, nil } - -// SetupWithManager sets up the controller with the Manager. -func (r *ImageScanReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&fleet.ImageScan{}). - WithEventFilter( - // we do not trigger for status changes - predicate.And( - sharding.FilterByShardID(r.ShardID), - predicate.Or( - predicate.GenerationChangedPredicate{}, - predicate.AnnotationChangedPredicate{}, - predicate.LabelChangedPredicate{}, - ), - )). - Complete(r) -} diff --git a/internal/cmd/controller/target/builder.go b/internal/cmd/controller/target/builder.go index 6997919c4b..5166aa3c3e 100644 --- a/internal/cmd/controller/target/builder.go +++ b/internal/cmd/controller/target/builder.go @@ -35,7 +35,8 @@ func New(client client.Client) *Manager { // This is done by checking all namespaces for clusters matching the bundle's // BundleTarget matchers. // -// The returned target structs contain merged BundleDeploymentOptions. +// The returned target structs contain merged BundleDeploymentOptions, which +// includes the "TargetCustomizations" from fleet.yaml. // Finally all existing bundledeployments are added to the targets. func (m *Manager) Targets(ctx context.Context, bundle *fleet.Bundle, manifestID string) ([]*Target, error) { logger := log.FromContext(ctx).WithName("targets") @@ -106,7 +107,26 @@ func (m *Manager) Targets(ctx context.Context, bundle *fleet.Bundle, manifestID return targets[i].Cluster.Name < targets[j].Cluster.Name }) - return targets, m.foldInDeployments(ctx, bundle, targets) + // add the existing bundledeployments to the targets. + bundleDeployments := &fleet.BundleDeploymentList{} + err = m.client.List(ctx, bundleDeployments, client.MatchingLabels{ + fleet.BundleLabel: bundle.Name, + fleet.BundleNamespaceLabel: bundle.Namespace, + }) + if err != nil { + return nil, err + } + + byNamespace := map[string]*fleet.BundleDeployment{} + for _, bd := range bundleDeployments.Items { + byNamespace[bd.Namespace] = bd.DeepCopy() + } + + for _, target := range targets { + target.Deployment = byNamespace[target.Cluster.Status.Namespace] + } + + return targets, err } // getNamespacesForBundle returns the namespaces that bundledeployments could @@ -145,29 +165,6 @@ func (m *Manager) getNamespacesForBundle(ctx context.Context, bundle *fleet.Bund return nses.List(), nil } -// foldInDeployments adds the existing bundledeployments to the targets. -func (m *Manager) foldInDeployments(ctx context.Context, bundle *fleet.Bundle, targets []*Target) error { - bundleDeployments := &fleet.BundleDeploymentList{} - err := m.client.List(ctx, bundleDeployments, client.MatchingLabels{ - fleet.BundleLabel: bundle.Name, - fleet.BundleNamespaceLabel: bundle.Namespace, - }) - if err != nil { - return err - } - - byNamespace := map[string]*fleet.BundleDeployment{} - for _, bd := range bundleDeployments.Items { - byNamespace[bd.Namespace] = bd.DeepCopy() - } - - for _, target := range targets { - target.Deployment = byNamespace[target.Cluster.Status.Namespace] - } - - return nil -} - func preprocessHelmValues(logger logr.Logger, opts *fleet.BundleDeploymentOptions, cluster *fleet.Cluster) (err error) { clusterLabels := yaml.CleanAnnotationsForExport(cluster.Labels) clusterAnnotations := yaml.CleanAnnotationsForExport(cluster.Annotations)