Skip to content

Commit

Permalink
🐛 Correctly handle concurrent updates to ClusterResourceSetBinding (k…
Browse files Browse the repository at this point in the history
…ubernetes-sigs#10656)

* fix: Correctly handle concurrent updates to ClusterResourceSetBinding

The existing code does not do optimistic locking on the CRSBinding via
`resourceVersion` and hence concurent updates (patches) overwrite each
other, leading to races and inconsistent state of the CRSBinding.

This commit fixes that by forcing optimistic locking via the
controller-runtime client patch options. The downside to this is that
it leads to more reconciles and log output of failed updates, but
this is a much better situation than having inconsistent and inaccurate
state stored in CRSBinding.

* fixup! refactor: Add requested comment re patching spec only
  • Loading branch information
jimmidyson authored Jun 18, 2024
1 parent 59a4531 commit 1cbe864
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 29 deletions.
16 changes: 7 additions & 9 deletions exp/addons/internal/controllers/clusterresourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (r *ClusterResourceSetReconciler) getClustersByClusterResourceSetSelector(c
// In Reconcile strategy, resources are re-applied to a particular cluster when their definition changes. The hash in ClusterResourceSetBinding is used to check
// if a resource has changed or not.
// TODO: If a resource already exists in the cluster but not applied by ClusterResourceSet, the resource will be updated ?
func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Context, cluster *clusterv1.Cluster, clusterResourceSet *addonsv1.ClusterResourceSet) error {
func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Context, cluster *clusterv1.Cluster, clusterResourceSet *addonsv1.ClusterResourceSet) (rerr error) {
log := ctrl.LoggerFrom(ctx, "Cluster", klog.KObj(cluster))
ctx = ctrl.LoggerInto(ctx, log)

Expand All @@ -295,16 +295,14 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
return err
}

// Initialize the patch helper.
patchHelper, err := patch.NewHelper(clusterResourceSetBinding, r.Client)
if err != nil {
return err
}
patch := client.MergeFromWithOptions(clusterResourceSetBinding.DeepCopy(), client.MergeFromWithOptimisticLock{})

defer func() {
// Always attempt to Patch the ClusterResourceSetBinding object after each reconciliation.
if err := patchHelper.Patch(ctx, clusterResourceSetBinding); err != nil {
log.Error(err, "Failed to patch config")
// Note only the ClusterResourceSetBinding spec will be patched as it does not have a status field, and so
// using the patch helper is unnecessary.
if err := r.Client.Patch(ctx, clusterResourceSetBinding, patch); err != nil {
rerr = kerrors.NewAggregate([]error{rerr, errors.Wrapf(err, "failed to patch ClusterResourceSetBinding %s", klog.KObj(clusterResourceSetBinding))})
}
}()

Expand All @@ -315,7 +313,7 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
Name: clusterResourceSet.Name,
UID: clusterResourceSet.UID,
}))
errList := []error{}
var errList []error
resourceSetBinding := clusterResourceSetBinding.GetOrCreateBinding(clusterResourceSet)

// Iterate all resources and apply them to the cluster and update the resource status in the ClusterResourceSetBinding object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,11 +990,56 @@ metadata:
g.Expect(env.Patch(ctx, clusterResourceSetInstance, client.MergeFrom(clusterResourceSetInstance.DeepCopy()))).To(Succeed())

// Wait until ClusterResourceSetBinding is created for the Cluster
g.Eventually(func() bool {
g.Eventually(func(g Gomega) {
binding := &addonsv1.ClusterResourceSetBinding{}
err := env.Get(ctx, clusterResourceSetBindingKey, binding)
return err == nil
}, timeout).Should(BeTrue())
g.Expect(env.Get(ctx, clusterResourceSetBindingKey, binding)).Should(Succeed())
}, timeout).Should(Succeed())
})

t.Run("Should handle applying multiple ClusterResourceSets concurrently to the same cluster", func(t *testing.T) {
g := NewWithT(t)
ns := setup(t, g)
defer teardown(t, g, ns)

t.Log("Creating ClusterResourceSet instances that have same labels as selector")
for range 10 {
clusterResourceSetInstance := &addonsv1.ClusterResourceSet{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("clusterresourceset-%s", util.RandomString(6)),
Namespace: ns.Name,
},
Spec: addonsv1.ClusterResourceSetSpec{
ClusterSelector: metav1.LabelSelector{
MatchLabels: labels,
},
Resources: []addonsv1.ResourceRef{{Name: configmapName, Kind: "ConfigMap"}, {Name: secretName, Kind: "Secret"}},
},
}
// Create the ClusterResourceSet.
g.Expect(env.Create(ctx, clusterResourceSetInstance)).To(Succeed())
}

t.Log("Updating the cluster with labels to trigger cluster resource sets to be applied")
testCluster.SetLabels(labels)
g.Expect(env.Update(ctx, testCluster)).To(Succeed())

t.Log("Verifying ClusterResourceSetBinding shows that all CRS have been applied")
g.Eventually(func(g Gomega) {
clusterResourceSetBindingKey := client.ObjectKey{Namespace: testCluster.Namespace, Name: testCluster.Name}
binding := &addonsv1.ClusterResourceSetBinding{}
g.Expect(env.Get(ctx, clusterResourceSetBindingKey, binding)).Should(Succeed())
g.Expect(binding.Spec.Bindings).To(HaveLen(10))
for _, b := range binding.Spec.Bindings {
g.Expect(b.Resources).To(HaveLen(2))
for _, r := range b.Resources {
g.Expect(r.Applied).To(BeTrue())
}
}
g.Expect(binding.OwnerReferences).To(HaveLen(10))
}, 4*timeout).Should(Succeed())
t.Log("Deleting the created ClusterResourceSet instances")
g.Expect(env.DeleteAllOf(ctx, &addonsv1.ClusterResourceSet{}, client.InNamespace(ns.Name))).To(Succeed())
g.Expect(env.DeleteAllOf(ctx, &addonsv1.ClusterResourceSetBinding{}, client.InNamespace(ns.Name))).To(Succeed())
})
}

Expand Down
29 changes: 15 additions & 14 deletions exp/addons/internal/controllers/clusterresourceset_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
addonsv1 "sigs.k8s.io/cluster-api/exp/addons/api/v1beta1"
Expand Down Expand Up @@ -108,27 +109,27 @@ func createUnstructured(ctx context.Context, c client.Client, obj *unstructured.

// getOrCreateClusterResourceSetBinding retrieves ClusterResourceSetBinding resource owned by the cluster or create a new one if not found.
func (r *ClusterResourceSetReconciler) getOrCreateClusterResourceSetBinding(ctx context.Context, cluster *clusterv1.Cluster, clusterResourceSet *addonsv1.ClusterResourceSet) (*addonsv1.ClusterResourceSetBinding, error) {
clusterResourceSetBinding := &addonsv1.ClusterResourceSetBinding{}
clusterResourceSetBindingKey := client.ObjectKey{
Namespace: cluster.Namespace,
Name: cluster.Name,
clusterResourceSetBinding := &addonsv1.ClusterResourceSetBinding{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Namespace: cluster.Namespace,
},
}
clusterResourceSetBindingKey := client.ObjectKeyFromObject(clusterResourceSetBinding)

if err := r.Client.Get(ctx, clusterResourceSetBindingKey, clusterResourceSetBinding); err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
}
clusterResourceSetBinding.Name = cluster.Name
clusterResourceSetBinding.Namespace = cluster.Namespace
clusterResourceSetBinding.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: addonsv1.GroupVersion.String(),
Kind: "ClusterResourceSet",
Name: clusterResourceSet.Name,
UID: clusterResourceSet.UID,
},
err = controllerutil.SetOwnerReference(
clusterResourceSet,
clusterResourceSetBinding,
r.Client.Scheme(),
)
if err != nil {
return nil, errors.Wrapf(err, "failed to set owner reference for clusterResourceSetBinding %s for cluster %s/%s", clusterResourceSetBindingKey, cluster.Namespace, cluster.Name)
}
clusterResourceSetBinding.Spec.Bindings = []*addonsv1.ResourceSetBinding{}

clusterResourceSetBinding.Spec.ClusterName = cluster.Name
if err := r.Client.Create(ctx, clusterResourceSetBinding); err != nil {
if apierrors.IsAlreadyExists(err) {
Expand Down
4 changes: 2 additions & 2 deletions exp/addons/internal/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ func TestMain(m *testing.M) {
Client: mgr.GetClient(),
Tracker: tracker,
}
if err = reconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}, partialSecretCache); err != nil {
if err = reconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 10}, partialSecretCache); err != nil {
panic(fmt.Sprintf("Failed to set up cluster resource set reconciler: %v", err))
}
bindingReconciler := ClusterResourceSetBindingReconciler{
Client: mgr.GetClient(),
}
if err = bindingReconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
if err = bindingReconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 10}); err != nil {
panic(fmt.Sprintf("Failed to set up cluster resource set binding reconciler: %v", err))
}
}
Expand Down

0 comments on commit 1cbe864

Please sign in to comment.