From 666c6fc6eac3ce8d3094d266dc5c5496206e268a Mon Sep 17 00:00:00 2001 From: Xavi Garcia Date: Wed, 23 Oct 2024 12:40:36 +0200 Subject: [PATCH] Adds predicate when webhook commit changes This adds a new predicate when the webhook commit is changed in the `GitRepo`. It also filters out `Job` events for creation/deletion, because they add extra reconcile loops that only increase the possibility of race conditions and that are not required. The reconciler already knows when a `Job` is created/deleted because it is the owner and does not need to react upon those 2 events from the cluster. Refers to: https://github.com/rancher/fleet/issues/2969 Signed-off-by: Xavi Garcia --- .../gitjob/controller/controller_test.go | 87 ++++++++++++++++++- .../gitops/reconciler/gitjob_controller.go | 40 +++++++++ 2 files changed, 124 insertions(+), 3 deletions(-) diff --git a/integrationtests/gitjob/controller/controller_test.go b/integrationtests/gitjob/controller/controller_test.go index 9b42cfdf29..43c12fa7e4 100644 --- a/integrationtests/gitjob/controller/controller_test.go +++ b/integrationtests/gitjob/controller/controller_test.go @@ -577,9 +577,11 @@ var _ = Describe("GitJob controller", func() { When("a new GitRepo is created with DisablePolling set to true", func() { var ( - gitRepo v1alpha1.GitRepo - gitRepoName string - job batchv1.Job + gitRepo v1alpha1.GitRepo + gitRepoName string + job batchv1.Job + webhookCommit string + forceUpdateGeneration int ) JustBeforeEach(func() { @@ -591,6 +593,40 @@ var _ = Describe("GitJob controller", func() { jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) }).Should(Not(HaveOccurred())) + + // change the webhookCommit if it's set + if webhookCommit != "" { + // simulate job was successful + Eventually(func() error { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5)) + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + // We could be checking this when the job is still not created + Expect(client.IgnoreNotFound(err)).ToNot(HaveOccurred()) + job.Status.Succeeded = 1 + job.Status.Conditions = []batchv1.JobCondition{ + { + Type: "Complete", + Status: "True", + }, + } + return k8sClient.Status().Update(ctx, &job) + }).Should(Not(HaveOccurred())) + // wait until the job has finished + Eventually(func() bool { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5)) + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + return errors.IsNotFound(err) + }).Should(BeTrue()) + + // set now the webhook commit + expectedCommit = webhookCommit + Expect(setGitRepoWebhookCommit(gitRepo, webhookCommit)).To(Succeed()) + // increase forceUpdateGeneration if need to exercise possible race conditions + // in the reconciler + for range forceUpdateGeneration { + Expect(simulateIncreaseForceSyncGeneration(gitRepo)).To(Succeed()) + } + } }) When("a job completes successfully", func() { @@ -616,6 +652,39 @@ var _ = Describe("GitJob controller", func() { }, "30s", "1s").Should(Equal(stableCommit)) }) }) + + When("WebhookCommit changes and user clicks ForceUpdateGeneration", func() { + BeforeEach(func() { + gitRepoName = "disable-polling-commit-change-force-update" + webhookCommit = "af6116a6c5c3196043b4a456316ae257dad9b5db" + expectedCommit = stableCommit + // user clicks ForceUpdate 2 times + // This exercises possible race conditions in the reconciler + forceUpdateGeneration = 2 + }) + + It("creates a new Job", func() { + Eventually(func() error { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+webhookCommit, 5)) + return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + }).Should(Not(HaveOccurred())) + }) + }) + + When("WebhookCommit changes", func() { + BeforeEach(func() { + gitRepoName = "disable-polling-commit-change" + webhookCommit = "af6116a6c5c3196043b4a456316ae257dad9b5db" + expectedCommit = stableCommit + }) + + It("creates a new Job", func() { + Eventually(func() error { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+webhookCommit, 5)) + return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + }).Should(Not(HaveOccurred())) + }) + }) }) When("creating a gitRepo that references a nonexistent helm secret", func() { @@ -909,6 +978,18 @@ func simulateIncreaseForceSyncGeneration(gitRepo v1alpha1.GitRepo) error { }) } +func setGitRepoWebhookCommit(gitRepo v1alpha1.GitRepo, commit string) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + var gitRepoFromCluster v1alpha1.GitRepo + err := k8sClient.Get(ctx, types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace}, &gitRepoFromCluster) + if err != nil { + return err + } + gitRepoFromCluster.Status.WebhookCommit = commit + return k8sClient.Status().Update(ctx, &gitRepoFromCluster) + }) +} + func createGitRepo(gitRepoName string) v1alpha1.GitRepo { return v1alpha1.GitRepo{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index 8816008e08..ce015c7505 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -101,9 +101,11 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error { predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}, predicate.LabelChangedPredicate{}, + webhookCommitChangedPredicate(), ), ), ). + Owns(&batchv1.Job{}, builder.WithPredicates(jobUpdatesOnly())). Watches( // Fan out from bundle to gitrepo &v1alpha1.Bundle{}, @@ -1203,3 +1205,41 @@ func result(repoPolled bool, gitrepo *v1alpha1.GitRepo) reconcile.Result { } return reconcile.Result{} } + +func webhookCommitChangedPredicate() predicate.Predicate { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldGitRepo, ok := e.ObjectOld.(*v1alpha1.GitRepo) + if !ok { + return true + } + newGitRepo, ok := e.ObjectNew.(*v1alpha1.GitRepo) + if !ok { + return true + } + return oldGitRepo.Status.WebhookCommit != newGitRepo.Status.WebhookCommit + }, + } +} + +func jobUpdatesOnly() predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + UpdateFunc: func(e event.UpdateEvent) bool { + n, isJob := e.ObjectNew.(*batchv1.Job) + if !isJob { + return false + } + o := e.ObjectOld.(*batchv1.Job) + if n == nil || o == nil { + return false + } + return !reflect.DeepEqual(n.Status, o.Status) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + } +}