From 98adea96f03c805161c33a80531eba4f068d0c93 Mon Sep 17 00:00:00 2001 From: Xavi Garcia Date: Mon, 15 Jul 2024 10:04:10 +0200 Subject: [PATCH] Use RequeueAfter for Gitpolling job. Uses the Manager's timers instead of creating a different go routine to deal with the polling job. This is an approach to deal with multiple calls updating the status of a `gitrepo` in the same reconcile loop. There were issues setting the status and getting it in the same reconcile as the cache was not properly set yet. This tries to fix those issues by only updating the status once per reconcile. Refers to: https://github.com/rancher/fleet/issues/2631 Signed-off-by: Xavi Garcia --- charts/fleet-crd/templates/crds.yaml | 5 + .../gitrepo_polling_disabled_test.go | 9 +- e2e/single-cluster/gitrepo_test.go | 8 +- .../gitjob/controller/controller_test.go | 76 ++-- .../gitjob/controller/suite_test.go | 39 +- internal/cmd/controller/gitops/operator.go | 17 +- .../gitops/reconciler/gitjob_controller.go | 159 ++++--- .../gitops/reconciler/gitjob_test.go | 430 ++++++++++++++++-- .../fleet.cattle.io/v1alpha1/gitrepo_types.go | 2 + .../v1alpha1/zz_generated.deepcopy.go | 1 + pkg/git/mocks/scheduledjob_mock.go | 23 - pkg/git/mocks/scheduler_mock.go | 188 -------- pkg/git/poll/gitrepopolljob.go | 89 ---- pkg/git/poll/gitrepopolljob_test.go | 98 ---- pkg/git/poll/handler.go | 131 ------ pkg/git/poll/handler_test.go | 304 ------------- pkg/git/poll/suite_test.go | 22 - 17 files changed, 578 insertions(+), 1023 deletions(-) delete mode 100644 pkg/git/mocks/scheduledjob_mock.go delete mode 100644 pkg/git/mocks/scheduler_mock.go delete mode 100644 pkg/git/poll/gitrepopolljob.go delete mode 100644 pkg/git/poll/gitrepopolljob_test.go delete mode 100644 pkg/git/poll/handler.go delete mode 100644 pkg/git/poll/handler_test.go delete mode 100644 pkg/git/poll/suite_test.go diff --git a/charts/fleet-crd/templates/crds.yaml b/charts/fleet-crd/templates/crds.yaml index 5b9062781f..1f3776b13a 100644 --- a/charts/fleet-crd/templates/crds.yaml +++ b/charts/fleet-crd/templates/crds.yaml @@ -6490,6 +6490,11 @@ spec: description: GitJobStatus is the status of the last Git job run, e.g. "Current" if there was no error. type: string + lastPollingTriggered: + description: LastPollingTime is the last time the polling check + was triggered + format: date-time + type: string lastSyncedImageScanTime: description: LastSyncedImageScanTime is the time of the last image scan. diff --git a/e2e/single-cluster/gitrepo_polling_disabled_test.go b/e2e/single-cluster/gitrepo_polling_disabled_test.go index a2e67dc323..5cd9c6ebeb 100644 --- a/e2e/single-cluster/gitrepo_polling_disabled_test.go +++ b/e2e/single-cluster/gitrepo_polling_disabled_test.go @@ -85,7 +85,11 @@ var _ = Describe("GitRepoPollingDisabled", Label("infra-setup"), func() { }) JustBeforeEach(func() { - err := testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo-polling-disabled.yaml"), struct { + var err error + clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "disable_polling") + Expect(err).ToNot(HaveOccurred()) + + err = testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo-polling-disabled.yaml"), struct { Name string Repo string Branch string @@ -97,9 +101,6 @@ var _ = Describe("GitRepoPollingDisabled", Label("infra-setup"), func() { targetNamespace, }) Expect(err).ToNot(HaveOccurred()) - - clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "disable_polling") - Expect(err).ToNot(HaveOccurred()) }) It("deploys the resources initially and updates them while force updating", func() { diff --git a/e2e/single-cluster/gitrepo_test.go b/e2e/single-cluster/gitrepo_test.go index f843f09fea..26a42a4379 100644 --- a/e2e/single-cluster/gitrepo_test.go +++ b/e2e/single-cluster/gitrepo_test.go @@ -263,6 +263,10 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup" return err }).ShouldNot(HaveOccurred(), out) + // Clone previously created repo + clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "examples") + Expect(err).ToNot(HaveOccurred()) + err = testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo.yaml"), struct { Name string Repo string @@ -277,10 +281,6 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup" targetNamespace, // to avoid conflicts with other tests }) Expect(err).ToNot(HaveOccurred()) - - // Clone previously created repo - clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "examples") - Expect(err).ToNot(HaveOccurred()) }) It("updates the deployment", func() { diff --git a/integrationtests/gitjob/controller/controller_test.go b/integrationtests/gitjob/controller/controller_test.go index 7c73fe2305..7aa630a58f 100644 --- a/integrationtests/gitjob/controller/controller_test.go +++ b/integrationtests/gitjob/controller/controller_test.go @@ -13,6 +13,7 @@ import ( "github.com/rancher/fleet/integrationtests/utils" v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/wrangler/v3/pkg/genericcondition" "github.com/rancher/wrangler/v3/pkg/name" batchv1 "k8s.io/api/batch/v1" @@ -32,6 +33,23 @@ const ( stableCommit = "26bdd9326b0238bb2fb743f863d9380c3c5d43e0" ) +func getCondition(gitrepo *v1alpha1.GitRepo, condType string) (genericcondition.GenericCondition, bool) { + for _, cond := range gitrepo.Status.Conditions { + if cond.Type == condType { + return cond, true + } + } + return genericcondition.GenericCondition{}, false +} + +func checkCondition(gitrepo *v1alpha1.GitRepo, condType string, status corev1.ConditionStatus) bool { + cond, found := getCondition(gitrepo, condType) + if !found { + return false + } + return cond.Type == condType && cond.Status == status +} + var _ = Describe("GitJob controller", func() { When("a new GitRepo is created", func() { @@ -43,6 +61,7 @@ var _ = Describe("GitJob controller", func() { ) JustBeforeEach(func() { + expectedCommit = commit gitRepo = createGitRepo(gitRepoName) Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred()) Eventually(func() string { @@ -54,7 +73,6 @@ var _ = Describe("GitJob controller", func() { } return gitRepoFromCluster.Status.Display.ReadyBundleDeployments }).Should(ContainSubstring("0/0")) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) By("Creating a job") Eventually(func() error { @@ -146,8 +164,8 @@ var _ = Describe("GitJob controller", func() { Eventually(func() bool { Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).ToNot(HaveOccurred()) - // XXX: do we need an additional `LastExecutedCommit` or similar status field? - return gitRepo.Status.GitJobStatus == "Failed" + // Job Status should be failed and commit should be as expected + return gitRepo.Status.GitJobStatus == "Failed" && gitRepo.Status.Commit == commit }).Should(BeTrue()) By("verifying that the job is deleted if Spec.Generation changed") @@ -168,9 +186,9 @@ var _ = Describe("GitJob controller", func() { ) JustBeforeEach(func() { + expectedCommit = commit gitRepo = createGitRepo(gitRepoName) Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred()) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) By("creating a Job") Eventually(func() error { @@ -185,7 +203,7 @@ var _ = Describe("GitJob controller", func() { }) It("creates a new Job", func() { const newCommit = "9ca3a0adbbba32" - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, newCommit)).ToNot(HaveOccurred()) + expectedCommit = newCommit Eventually(func() error { jobName := name.SafeConcatName(gitRepoName, name.Hex(repo+newCommit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) @@ -206,7 +224,6 @@ var _ = Describe("GitJob controller", func() { JustBeforeEach(func() { gitRepo = createGitRepo(gitRepoName) Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred()) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) Eventually(func() error { jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) @@ -216,6 +233,7 @@ var _ = Describe("GitJob controller", func() { Expect(simulateIncreaseForceSyncGeneration(gitRepo)).ToNot(HaveOccurred()) }) BeforeEach(func() { + expectedCommit = commit gitRepoName = "force-deletion" }) AfterEach(func() { @@ -254,7 +272,6 @@ var _ = Describe("GitJob controller", func() { JustBeforeEach(func() { gitRepo = createGitRepo(gitRepoName) Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred()) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) Eventually(func() error { jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) @@ -274,6 +291,7 @@ var _ = Describe("GitJob controller", func() { }) BeforeEach(func() { + expectedCommit = commit gitRepoName = "simulate-arg-update" }) @@ -307,6 +325,7 @@ var _ = Describe("GitJob controller", func() { When("a job completes successfully", func() { BeforeEach(func() { + expectedCommit = stableCommit gitRepoName = "disable-polling" }) @@ -338,6 +357,7 @@ var _ = Describe("GitJob controller", func() { ) JustBeforeEach(func() { + expectedCommit = commit gitRepoName = "test-no-for-paths-secret" gitRepo = createGitRepo(gitRepoName) gitRepo.Spec.HelmSecretNameForPaths = helmSecretNameForPaths @@ -345,7 +365,6 @@ var _ = Describe("GitJob controller", func() { // Create should return an error err := k8sClient.Create(ctx, &gitRepo) Expect(err).ToNot(HaveOccurred()) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) }) AfterEach(func() { @@ -473,28 +492,25 @@ var _ = Describe("GitRepo", func() { When("creating a gitrepo", func() { JustBeforeEach(func() { + expectedCommit = commit err := k8sClient.Create(ctx, gitrepo) Expect(err).NotTo(HaveOccurred()) }) It("updates the gitrepo status", func() { org := gitrepo.ResourceVersion - Eventually(func() bool { + Eventually(func(g Gomega) { _ = k8sClient.Get(ctx, types.NamespacedName{Name: gitrepoName, Namespace: namespace}, gitrepo) - return gitrepo.ResourceVersion > org && - gitrepo.Status.Display.ReadyBundleDeployments == "0/0" && - !gitrepo.Status.Display.Error && - len(gitrepo.Status.Conditions) == 4 && - gitrepo.Status.Conditions[0].Type == "Reconciling" && - string(gitrepo.Status.Conditions[0].Status) == "False" && - gitrepo.Status.Conditions[1].Type == "Stalled" && - string(gitrepo.Status.Conditions[1].Status) == "False" && - gitrepo.Status.Conditions[2].Type == "Ready" && - string(gitrepo.Status.Conditions[2].Status) == "True" && - gitrepo.Status.Conditions[3].Type == "Accepted" && - string(gitrepo.Status.Conditions[3].Status) == "True" && - gitrepo.Status.DeepCopy().ObservedGeneration == int64(1) - }).Should(BeTrue()) + g.Expect(gitrepo.ResourceVersion > org).To(BeTrue()) + g.Expect(gitrepo.Status.Display.ReadyBundleDeployments).To(Equal("0/0")) + g.Expect(gitrepo.Status.Display.Error).To(BeFalse()) + g.Expect(len(gitrepo.Status.Conditions)).To(Equal(5)) + g.Expect(checkCondition(gitrepo, "GitPolling", corev1.ConditionTrue)).To(BeTrue()) + g.Expect(checkCondition(gitrepo, "Reconciling", corev1.ConditionTrue)).To(BeTrue()) + g.Expect(checkCondition(gitrepo, "Stalled", corev1.ConditionFalse)).To(BeTrue()) + g.Expect(checkCondition(gitrepo, "Ready", corev1.ConditionTrue)).To(BeTrue()) + g.Expect(checkCondition(gitrepo, "Accepted", corev1.ConditionTrue)).To(BeTrue()) + }).Should(Succeed()) }) }) }) @@ -619,20 +635,6 @@ func simulateIncreaseGitRepoGeneration(gitRepo v1alpha1.GitRepo) error { }) } -func simulateGitPollerUpdatingCommitInStatus(gitRepo v1alpha1.GitRepo, commit string) error { - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - var gitRepoFromCluster v1alpha1.GitRepo - err := k8sClient.Get(ctx, types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace}, &gitRepoFromCluster) - if err != nil { - return err - } - gitRepoFromCluster.Status = v1alpha1.GitRepoStatus{ - Commit: commit, - } - return k8sClient.Status().Update(ctx, &gitRepoFromCluster) - }) -} - func createGitRepo(gitRepoName string) v1alpha1.GitRepo { return v1alpha1.GitRepo{ ObjectMeta: metav1.ObjectMeta{ diff --git a/integrationtests/gitjob/controller/suite_test.go b/integrationtests/gitjob/controller/suite_test.go index 328a48b72b..310f5e91b6 100644 --- a/integrationtests/gitjob/controller/suite_test.go +++ b/integrationtests/gitjob/controller/suite_test.go @@ -17,8 +17,8 @@ import ( ctrlreconciler "github.com/rancher/fleet/internal/cmd/controller/reconciler" "github.com/rancher/fleet/internal/cmd/controller/target" "github.com/rancher/fleet/internal/manifest" - "github.com/rancher/fleet/internal/mocks" v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/fleet/pkg/git/mocks" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -33,13 +33,14 @@ const ( ) var ( - cfg *rest.Config - testEnv *envtest.Environment - ctx context.Context - cancel context.CancelFunc - k8sClient client.Client - logsBuffer bytes.Buffer - namespace string + cfg *rest.Config + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc + k8sClient client.Client + logsBuffer bytes.Buffer + namespace string + expectedCommit string ) func TestGitJobController(t *testing.T) { @@ -73,25 +74,29 @@ var _ = BeforeSuite(func() { Expect(err).ToNot(HaveOccurred()) ctlr := gomock.NewController(GinkgoT()) - gitPollerMock := mocks.NewMockGitPoller(ctlr) // redirect logs to a buffer that we can read in the tests GinkgoWriter.TeeTo(&logsBuffer) ctrl.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - // do nothing if gitPoller is called. gitPoller calls are tested in unit tests - gitPollerMock.EXPECT().AddOrModifyGitRepoPollJob(gomock.Any(), gomock.Any()).AnyTimes() - gitPollerMock.EXPECT().CleanUpGitRepoPollJobs(gomock.Any()).AnyTimes() + // return whatever commit the test is expecting + fetcherMock := mocks.NewMockGitFetcher(ctlr) + fetcherMock.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn( + func(ctx context.Context, gitrepo *v1alpha1.GitRepo, client client.Client) (string, error) { + return expectedCommit, nil + }, + ) sched := quartz.NewStdScheduler() Expect(sched).ToNot(BeNil()) err = (&reconciler.GitJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Image: "image", - GitPoller: gitPollerMock, - Scheduler: sched, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Image: "image", + Scheduler: sched, + GitFetcher: fetcherMock, + Clock: reconciler.RealClock{}, }).SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/cmd/controller/gitops/operator.go b/internal/cmd/controller/gitops/operator.go index b22d04f155..1a11d9021a 100644 --- a/internal/cmd/controller/gitops/operator.go +++ b/internal/cmd/controller/gitops/operator.go @@ -12,7 +12,7 @@ import ( "github.com/rancher/fleet/internal/cmd/controller/gitops/reconciler" "github.com/rancher/fleet/internal/metrics" fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/git/poll" + "github.com/rancher/fleet/pkg/git" "github.com/rancher/fleet/pkg/version" "github.com/rancher/fleet/pkg/webhook" "github.com/reugn/go-quartz/quartz" @@ -122,13 +122,14 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error { } reconciler := &reconciler.GitJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Image: g.Image, - GitPoller: poll.NewHandler(ctx, mgr.GetClient()), - Scheduler: sched, - Workers: workers, - ShardID: g.ShardID, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Image: g.Image, + Scheduler: sched, + Workers: workers, + ShardID: g.ShardID, + GitFetcher: &git.Fetch{}, + Clock: reconciler.RealClock{}, } group, ctx := errgroup.WithContext(ctx) diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index d288ea0b96..1a3902cce6 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -15,8 +15,8 @@ import ( "github.com/rancher/fleet/internal/metrics" "github.com/rancher/fleet/internal/ociwrapper" v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/git" "github.com/rancher/fleet/pkg/sharding" + "github.com/rancher/wrangler/v3/pkg/condition" "github.com/reugn/go-quartz/quartz" "github.com/rancher/wrangler/v3/pkg/name" @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( @@ -48,24 +49,38 @@ const ( gitClonerVolumeName = "git-cloner" emptyDirVolumeName = "git-cloner-empty-dir" fleetHomeDir = "/fleet-home" + + defaultPollingSyncInterval = 15 * time.Second + gitPollingCondition = "GitPolling" ) var two = int32(2) -type GitPoller interface { - AddOrModifyGitRepoPollJob(ctx context.Context, gitRepo v1alpha1.GitRepo) - CleanUpGitRepoPollJobs(ctx context.Context) +type GitFetcher interface { + LatestCommit(ctx context.Context, gitrepo *v1alpha1.GitRepo, client client.Client) (string, error) +} + +// TimeGetter interface is used to mock the time.Now() call in unit tests +type TimeGetter interface { + Now() time.Time + Since(t time.Time) time.Duration } +type RealClock struct{} + +func (RealClock) Now() time.Time { return time.Now() } +func (RealClock) Since(t time.Time) time.Duration { return time.Since(t) } + // CronJobReconciler reconciles a GitRepo resource to create a git cloning k8s job type GitJobReconciler struct { client.Client - Scheme *runtime.Scheme - Image string - GitPoller GitPoller - Scheduler quartz.Scheduler - Workers int - ShardID string + Scheme *runtime.Scheme + Image string + Scheduler quartz.Scheduler + Workers int + ShardID string + GitFetcher GitFetcher + Clock TimeGetter } func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -77,7 +92,6 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error { predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}, predicate.LabelChangedPredicate{}, - commitChangedPredicate(), ), ), ). @@ -122,7 +136,6 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, err } else if errors.IsNotFound(err) { logger.V(1).Info("Gitrepo deleted, cleaning up poll jobs") - r.GitPoller.CleanUpGitRepoPollJobs(ctx) return ctrl.Result{}, nil } @@ -148,6 +161,9 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if client.IgnoreNotFound(err) != nil { return ctrl.Result{}, err } + + // requeue as adding the finalizer changes the spec + return ctrl.Result{Requeue: true}, nil } logger = logger.WithValues("generation", gitrepo.Generation, "commit", gitrepo.Status.Commit) @@ -159,7 +175,10 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, nil } - r.GitPoller.AddOrModifyGitRepoPollJob(ctx, *gitrepo) + gitPollerWasExecuted, _ := r.checkPollingTask(ctx, gitrepo) + // From this point onwards we have to take into account if the poller + // task was executed. + // If so, we need to return a Result with EnqueueAfter set. var job batchv1.Job err = r.Get(ctx, types.NamespacedName{ @@ -167,38 +186,37 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr Name: jobName(gitrepo), }, &job) if err != nil && !errors.IsNotFound(err) { - return ctrl.Result{}, fmt.Errorf("error retrieving git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("error retrieving git job: %w", err) } // Gitjob handling if errors.IsNotFound(err) { if gitrepo.Spec.DisablePolling { - if err := r.updateCommit(ctx, gitrepo); err != nil { - if errors.IsConflict(err) { - logger.V(1).Info("conflict updating commit, retrying", "message", err) - return ctrl.Result{Requeue: true}, nil // just retry, but don't show an error - } - return ctrl.Result{}, fmt.Errorf("error updating commit: %v", err) + commit, err := r.GitFetcher.LatestCommit(ctx, gitrepo, r.Client) + condition.Cond(gitPollingCondition).SetError(&gitrepo.Status, "", err) + if err == nil && commit != "" { + gitrepo.Status.Commit = commit } } + if gitrepo.Status.Commit != "" { if err := r.validateExternalSecretExist(ctx, gitrepo); err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } logger.V(1).Info("Creating Git job resources") if err := r.createJobRBAC(ctx, gitrepo); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to create RBAC resources for git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("failed to create RBAC resources for git job: %w", err) } if err := r.createTargetsConfigMap(ctx, gitrepo); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to create targets config map for git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("failed to create targets config map for git job: %w", err) } if err := r.createJob(ctx, gitrepo); err != nil { - return ctrl.Result{}, fmt.Errorf("error creating git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("error creating git job: %w", err) } } } else if gitrepo.Status.Commit != "" { if err = r.deleteJobIfNeeded(ctx, gitrepo, &job); err != nil { - return ctrl.Result{}, fmt.Errorf("error deleting git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("error deleting git job: %w", err) } } @@ -206,21 +224,21 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Refresh the status if err = grutil.SetStatusFromGitjob(ctx, r.Client, gitrepo, &job); err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } err = grutil.SetStatusFromBundleDeployments(ctx, r.Client, gitrepo) if err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } err = grutil.SetStatusFromBundles(ctx, r.Client, gitrepo) if err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } if err = grutil.UpdateDisplayState(gitrepo); err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } grutil.SetStatusFromResourceKey(ctx, r.Client, gitrepo) @@ -235,22 +253,10 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if err != nil { logger.V(1).Error(err, "Reconcile failed final update to git repo status", "status", gitrepo.Status) - return ctrl.Result{}, err + return reconcileResult(gitPollerWasExecuted, gitrepo), err } - return ctrl.Result{}, nil -} - -func (r *GitJobReconciler) updateCommit(ctx context.Context, gitRepo *v1alpha1.GitRepo) error { - fetcher := git.NewFetch() - commit, err := fetcher.LatestCommit(ctx, gitRepo, r.Client) - if err != nil { - return err - } - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - gitRepo.Status.Commit = commit - return r.Status().Update(ctx, gitRepo) - }) + return reconcileResult(gitPollerWasExecuted, gitrepo), nil } func (r *GitJobReconciler) cleanupGitRepo(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo) error { @@ -307,23 +313,6 @@ func (r *GitJobReconciler) addGitRepoFinalizer(ctx context.Context, nsName types return nil } -func commitChangedPredicate() predicate.Predicate { - return predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - oldGitJob, ok := e.ObjectOld.(*v1alpha1.GitRepo) - if !ok { - return true - } - newGitJob, ok := e.ObjectNew.(*v1alpha1.GitRepo) - if !ok { - return true - } - - return oldGitJob.Status.Commit != newGitJob.Status.Commit - }, - } -} - func (r *GitJobReconciler) createJobRBAC(ctx context.Context, gitrepo *v1alpha1.GitRepo) error { // No update needed, values are the same. So we ignore AlreadyExists. saName := name.SafeConcatName("git", gitrepo.Name) @@ -989,3 +978,53 @@ func bundleStatusChangedPredicate() predicate.Funcs { }, } } + +func (r *GitJobReconciler) checkPollingTask(ctx context.Context, gitrepo *v1alpha1.GitRepo) (bool, error) { + if gitrepo.Spec.DisablePolling { + return false, nil + } + if r.shouldRunPollingTask(gitrepo) { + gitrepo.Status.LastPollingTime.Time = r.Clock.Now() + commit, err := r.GitFetcher.LatestCommit(ctx, gitrepo, r.Client) + condition.Cond(gitPollingCondition).SetError(&gitrepo.Status, "", err) + if err != nil { + return true, err + } + gitrepo.Status.Commit = commit + + return true, nil + } + + return false, nil +} + +func (r *GitJobReconciler) shouldRunPollingTask(gitrepo *v1alpha1.GitRepo) bool { + if gitrepo.Spec.DisablePolling { + return false + } + + t := gitrepo.Status.LastPollingTime + + if t.IsZero() || (r.Clock.Since(t.Time) >= getPollingIntervalDuration(gitrepo)) { + return true + } + if gitrepo.Status.ObservedGeneration != gitrepo.Generation { + return true + } + return false +} + +func getPollingIntervalDuration(gitrepo *v1alpha1.GitRepo) time.Duration { + if gitrepo.Spec.PollingInterval == nil || gitrepo.Spec.PollingInterval.Duration == 0 { + return defaultPollingSyncInterval + } + + return gitrepo.Spec.PollingInterval.Duration +} + +func reconcileResult(gitPollerWasExecuted bool, gitrepo *v1alpha1.GitRepo) reconcile.Result { + if gitPollerWasExecuted { + return reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)} + } + return reconcile.Result{} +} diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_test.go b/internal/cmd/controller/gitops/reconciler/gitjob_test.go index 3423929b82..1d367a1b0b 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_test.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_test.go @@ -5,29 +5,102 @@ package reconciler import ( "context" + "fmt" "os" "testing" + "time" "github.com/google/go-cmp/cmp" "go.uber.org/mock/gomock" + "github.com/rancher/fleet/internal/cmd/controller/finalize" "github.com/rancher/fleet/internal/mocks" fleetv1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/wrangler/v3/pkg/genericcondition" + gitmocks "github.com/rancher/fleet/pkg/git/mocks" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -func TestReconcile_AddOrModifyGitRepoPollJobIsCalled_WhenGitRepoIsCreatedOrModified(t *testing.T) { +type ClockMock struct { + t time.Time +} + +func (m ClockMock) Now() time.Time { return m.t } +func (m ClockMock) Since(t time.Time) time.Duration { return m.t.Sub(t) } + +func getGitPollingCondition(gitrepo *fleetv1.GitRepo) (genericcondition.GenericCondition, bool) { + for _, cond := range gitrepo.Status.Conditions { + if cond.Type == gitPollingCondition { + return cond, true + } + } + return genericcondition.GenericCondition{}, false +} + +func TestReconcile_ReturnsAndRequeuesAfterAddingFinalizer(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + scheme := runtime.NewScheme() + utilruntime.Must(batchv1.AddToScheme(scheme)) + gitRepo := fleetv1.GitRepo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitrepo", + Namespace: "default", + }, + } + namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} + client := mocks.NewMockClient(mockCtrl) + client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( + func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { + gitrepo.Name = gitRepo.Name + gitrepo.Namespace = gitRepo.Namespace + gitrepo.Spec.Repo = "repo" + return nil + }, + ) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + client.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + // check that we added the finalizer + if !controllerutil.ContainsFinalizer(repo, finalize.GitRepoFinalizer) { + t.Errorf("expecting gitrepo to contain finalizer") + } + }, + ).Times(1) + + r := GitJobReconciler{ + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, + } + + ctx := context.TODO() + + // second call is the one calling LatestCommit + res, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) + if err != nil { + t.Errorf("unexpected error %v", err) + } + if !res.Requeue { + t.Errorf("expecting Requeue set to true, it was false") + } +} + +func TestReconcile_LatestCommitErrorIsSetInConditions(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() scheme := runtime.NewScheme() @@ -41,56 +114,251 @@ func TestReconcile_AddOrModifyGitRepoPollJobIsCalled_WhenGitRepoIsCreatedOrModif namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} client := mocks.NewMockClient(mockCtrl) statusClient := mocks.NewMockSubResourceWriter(mockCtrl) - statusClient.EXPECT().Update(gomock.Any(), gomock.Any()) - client.EXPECT().Update(gomock.Any(), gomock.Any()).Times(1).Return(nil) client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { gitrepo.Name = gitRepo.Name gitrepo.Namespace = gitRepo.Namespace gitrepo.Spec.Repo = "repo" + controllerutil.AddFinalizer(gitrepo, finalize.GitRepoFinalizer) return nil }, ) client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) - client.EXPECT().Status().Return(statusClient) - poller := mocks.NewMockGitPoller(mockCtrl) - poller.EXPECT().AddOrModifyGitRepoPollJob(gomock.Any(), gomock.Any()).Times(1) - poller.EXPECT().CleanUpGitRepoPollJobs(gomock.Any()).Times(0) + client.EXPECT().Status().Return(statusClient).Times(1) + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return("", fmt.Errorf("TEST ERROR")) + statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + cond, found := getGitPollingCondition(repo) + if !found { + t.Errorf("expecting Condition %s to be found", gitPollingCondition) + } + if cond.Message != "TEST ERROR" { + t.Errorf("expecting condition message [TEST ERROR], got [%s]", cond.Message) + } + if cond.Type != gitPollingCondition { + t.Errorf("expecting condition type [%s], got [%s]", gitPollingCondition, cond.Type) + } + if cond.Status != "False" { + t.Errorf("expecting condition Status [False], got [%s]", cond.Type) + } + }, + ).Times(1) r := GitJobReconciler{ - Client: client, - Scheme: scheme, - Image: "", - GitPoller: poller, + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, } ctx := context.TODO() + + // second call is the one calling LatestCommit _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) if err != nil { t.Errorf("unexpected error %v", err) } } -func TestReconcile_PurgeWatchesIsCalled_WhenGitRepoIsCreatedOrModified(t *testing.T) { +func TestReconcile_LatestCommitIsOkay(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() scheme := runtime.NewScheme() utilruntime.Must(batchv1.AddToScheme(scheme)) + gitRepo := fleetv1.GitRepo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitrepo", + Namespace: "default", + }, + } + namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} + client := mocks.NewMockClient(mockCtrl) + statusClient := mocks.NewMockSubResourceWriter(mockCtrl) + client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( + func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { + gitrepo.Name = gitRepo.Name + gitrepo.Namespace = gitRepo.Namespace + gitrepo.Spec.Repo = "repo" + controllerutil.AddFinalizer(gitrepo, finalize.GitRepoFinalizer) + return nil + }, + ) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Status().Return(statusClient).Times(1) + + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + commit := "1883fd54bc5dfd225acf02aecbb6cb8020458e33" + fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(commit, nil) + statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + cond, found := getGitPollingCondition(repo) + if !found { + t.Errorf("expecting Condition %s to be found", gitPollingCondition) + } + if cond.Message != "" { + t.Errorf("expecting condition message empty, got [%s]", cond.Message) + } + if cond.Type != gitPollingCondition { + t.Errorf("expecting condition type [%s], got [%s]", gitPollingCondition, cond.Type) + } + if cond.Status != "True" { + t.Errorf("expecting condition Status [True], got [%s]", cond.Type) + } + if repo.Status.Commit != commit { + t.Errorf("expecting commit %s, got %s", commit, repo.Status.Commit) + } + }, + ).Times(1) + + r := GitJobReconciler{ + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, + } + ctx := context.TODO() - namespacedName := types.NamespacedName{Name: "gitRepo", Namespace: "default"} + + // second call is the one calling LatestCommit + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) + if err != nil { + t.Errorf("unexpected error %v", err) + } +} + +func TestReconcile_LatestCommitNotCalledYet(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + scheme := runtime.NewScheme() + utilruntime.Must(batchv1.AddToScheme(scheme)) + gitRepo := fleetv1.GitRepo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitrepo", + Namespace: "default", + }, + } + namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} client := mocks.NewMockClient(mockCtrl) - client.EXPECT().Get(ctx, namespacedName, gomock.Any()).Times(1).Return(errors.NewNotFound(schema.GroupResource{}, "NotFound")) - poller := mocks.NewMockGitPoller(mockCtrl) - poller.EXPECT().AddOrModifyGitRepoPollJob(ctx, gomock.Any()).Times(0) - poller.EXPECT().CleanUpGitRepoPollJobs(ctx).Times(1) + statusClient := mocks.NewMockSubResourceWriter(mockCtrl) + client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( + func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { + gitrepo.Name = gitRepo.Name + gitrepo.Namespace = gitRepo.Namespace + gitrepo.Spec.Repo = "repo" + controllerutil.AddFinalizer(gitrepo, finalize.GitRepoFinalizer) + + // set last polling time to now... + // default gitrepo polling time is 15 seconds, so it won't call LatestCommit this time + gitrepo.Status.LastPollingTime.Time = time.Now() + return nil + }, + ) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Status().Return(statusClient).Times(1) + statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + if repo.Status.Commit != "" { + t.Errorf("expecting gitrepo empty commit, got [%s]", repo.Status.Commit) + } + cond, found := getGitPollingCondition(repo) + if found { + t.Errorf("not expecting Condition %s to be found. Got [%s]", gitPollingCondition, cond) + } + }, + ).Times(1) + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) r := GitJobReconciler{ - Client: client, - Scheme: scheme, - Image: "", - GitPoller: poller, + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, } + + ctx := context.TODO() + + // second call is the one calling LatestCommit + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) + if err != nil { + t.Errorf("unexpected error %v", err) + } +} + +func TestReconcile_LatestCommitShouldBeCalled(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + scheme := runtime.NewScheme() + utilruntime.Must(batchv1.AddToScheme(scheme)) + gitRepo := fleetv1.GitRepo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitrepo", + Namespace: "default", + }, + } + namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} + client := mocks.NewMockClient(mockCtrl) + statusClient := mocks.NewMockSubResourceWriter(mockCtrl) + client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + commit := "1883fd54bc5dfd225acf02aecbb6cb8020458e33" + fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(commit, nil) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( + func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { + gitrepo.Name = gitRepo.Name + gitrepo.Namespace = gitRepo.Namespace + gitrepo.Spec.Repo = "repo" + controllerutil.AddFinalizer(gitrepo, finalize.GitRepoFinalizer) + + // set last polling time to now less 15 seconds (which is the default) + // that should trigger the polling job now + now := time.Now() + gitrepo.Status.LastPollingTime.Time = now.Add(time.Duration(-15) * time.Second) + // commit is something different to what we expect after this reconcile + gitrepo.Status.Commit = "dd45c7ad68e10307765104fea4a1f5997643020f" + return nil + }, + ) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Status().Return(statusClient).Times(1) + statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + cond, found := getGitPollingCondition(repo) + if !found { + t.Errorf("expecting Condition %s to be found", gitPollingCondition) + } + if cond.Message != "" { + t.Errorf("expecting condition message empty, got [%s]", cond.Message) + } + if cond.Type != gitPollingCondition { + t.Errorf("expecting condition type [%s], got [%s]", gitPollingCondition, cond.Type) + } + if cond.Status != "True" { + t.Errorf("expecting condition Status [True], got [%s]", cond.Type) + } + if repo.Status.Commit != commit { + t.Errorf("expecting commit %s, got %s", commit, repo.Status.Commit) + } + }, + ).Times(1) + + r := GitJobReconciler{ + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, + } + + ctx := context.TODO() + + // second call is the one calling LatestCommit _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) if err != nil { t.Errorf("unexpected error %v", err) @@ -132,13 +400,12 @@ func TestReconcile_Error_WhenGitrepoRestrictionsAreNotMet(t *testing.T) { } }, ) - poller := mocks.NewMockGitPoller(mockCtrl) r := GitJobReconciler{ - Client: mockClient, - Scheme: scheme, - Image: "", - GitPoller: poller, + Client: mockClient, + Scheme: scheme, + Image: "", + Clock: RealClock{}, } ctx := context.TODO() @@ -167,9 +434,6 @@ func TestNewJob(t *testing.T) { // nolint:funlen scheme := runtime.NewScheme() utilruntime.Must(batchv1.AddToScheme(scheme)) ctx := context.TODO() - poller := mocks.NewMockGitPoller(mockCtrl) - poller.EXPECT().AddOrModifyGitRepoPollJob(ctx, gomock.Any()).AnyTimes() - poller.EXPECT().CleanUpGitRepoPollJobs(ctx).AnyTimes() tests := map[string]struct { gitrepo *fleetv1.GitRepo @@ -555,10 +819,10 @@ func TestNewJob(t *testing.T) { // nolint:funlen for name, test := range tests { t.Run(name, func(t *testing.T) { r := GitJobReconciler{ - Client: test.client, - Scheme: scheme, - Image: "test", - GitPoller: poller, + Client: test.client, + Scheme: scheme, + Image: "test", + Clock: RealClock{}, } job, err := r.newGitJob(ctx, test.gitrepo) if err != nil { @@ -663,9 +927,9 @@ func TestGenerateJob_EnvVars(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { r := GitJobReconciler{ - Client: fake.NewFakeClient(), - Image: "test", - GitPoller: poller, + Client: fake.NewFakeClient(), + Image: "test", + Clock: RealClock{}, } for k, v := range test.osEnv { err := os.Setenv(k, v) @@ -693,6 +957,96 @@ func TestGenerateJob_EnvVars(t *testing.T) { } } +func TestCheckforPollingTask(t *testing.T) { + tests := map[string]struct { + gitrepo *fleetv1.GitRepo + timeNow time.Time + expectedResult bool + }{ + "LastPollingTime is not set": { + gitrepo: &fleetv1.GitRepo{}, + timeNow: time.Now(), // time here is irrelevant + expectedResult: true, + }, + "LastPollingTime is set but should still not trigger (1s away)": { + gitrepo: &fleetv1.GitRepo{ + Status: fleetv1.GitRepoStatus{ + LastPollingTime: metav1.Time{Time: time.Date(2024, time.July, 16, 15, 59, 59, 0, time.UTC)}, + }, + Spec: fleetv1.GitRepoSpec{ + PollingInterval: &metav1.Duration{Duration: 10 * time.Second}, + }, + }, + timeNow: time.Date(2024, time.July, 16, 16, 0, 0, 0, time.UTC), + expectedResult: false, + }, + "LastPollingTime is set and should trigger (10s away)": { + gitrepo: &fleetv1.GitRepo{ + Status: fleetv1.GitRepoStatus{ + LastPollingTime: metav1.Time{Time: time.Date(2024, time.July, 16, 15, 59, 50, 0, time.UTC)}, + }, + Spec: fleetv1.GitRepoSpec{ + PollingInterval: &metav1.Duration{Duration: 10 * time.Second}, + }, + }, + timeNow: time.Date(2024, time.July, 16, 16, 0, 0, 0, time.UTC), + expectedResult: true, + }, + "LastPollingTime is set but should still not trigger (1s away with default value)": { + gitrepo: &fleetv1.GitRepo{ + Status: fleetv1.GitRepoStatus{ + LastPollingTime: metav1.Time{Time: time.Date(2024, time.July, 16, 15, 59, 59, 0, time.UTC)}, + }, + }, + timeNow: time.Date(2024, time.July, 16, 16, 0, 0, 0, time.UTC), + expectedResult: false, + }, + "LastPollingTime is set and should trigger (15s away with default value)": { + gitrepo: &fleetv1.GitRepo{ + Status: fleetv1.GitRepoStatus{ + LastPollingTime: metav1.Time{Time: time.Date(2024, time.July, 16, 15, 59, 45, 0, time.UTC)}, + }, + }, + timeNow: time.Date(2024, time.July, 16, 16, 0, 0, 0, time.UTC), + expectedResult: true, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + commit := "1883fd54bc5dfd225acf02aecbb6cb8020458e33" + if test.expectedResult { + fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(commit, nil) + } + r := GitJobReconciler{ + Client: fake.NewFakeClient(), + Image: "test", + Clock: ClockMock{t: test.timeNow}, + GitFetcher: fetcher, + } + res, err := r.checkPollingTask(context.TODO(), test.gitrepo) + if res != test.expectedResult { + t.Errorf("unexpected result. Expecting %t, got %t", test.expectedResult, res) + } + if err != nil { + t.Errorf("not expecting to get an error, got [%v]", err) + } + if res { + // if the task was called, commit will be applied + if test.gitrepo.Status.Commit != commit { + t.Errorf("expecting commit: %s, got: %s", commit, test.gitrepo.Status.Commit) + } + // also LastPollingTime should be set to now + if test.gitrepo.Status.LastPollingTime.Time != test.timeNow { + t.Errorf("expecting LastPollingTime to be: %s, got: %s", test.timeNow, test.gitrepo.Status.LastPollingTime.Time) + } + } + }) + } +} + func httpSecretMock() client.Client { scheme := runtime.NewScheme() utilruntime.Must(corev1.AddToScheme(scheme)) diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go index d314345859..bbd3142e32 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go @@ -193,6 +193,8 @@ type GitRepoStatus struct { ResourceErrors []string `json:"resourceErrors,omitempty"` // LastSyncedImageScanTime is the time of the last image scan. LastSyncedImageScanTime metav1.Time `json:"lastSyncedImageScanTime,omitempty"` + // LastPollingTime is the last time the polling check was triggered + LastPollingTime metav1.Time `json:"lastPollingTriggered,omitempty"` } // GitRepoResourceCounts contains the number of resources in each state. diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go index 94071846ed..fe60d80726 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go @@ -1552,6 +1552,7 @@ func (in *GitRepoStatus) DeepCopyInto(out *GitRepoStatus) { copy(*out, *in) } in.LastSyncedImageScanTime.DeepCopyInto(&out.LastSyncedImageScanTime) + in.LastPollingTime.DeepCopyInto(&out.LastPollingTime) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GitRepoStatus. diff --git a/pkg/git/mocks/scheduledjob_mock.go b/pkg/git/mocks/scheduledjob_mock.go deleted file mode 100644 index d43cda0f79..0000000000 --- a/pkg/git/mocks/scheduledjob_mock.go +++ /dev/null @@ -1,23 +0,0 @@ -package mocks - -import ( - "time" - - quartz "github.com/reugn/go-quartz/quartz" -) - -type MockScheduledJob struct { - Detail *quartz.JobDetail - TriggerDuration time.Duration -} - -func (m *MockScheduledJob) JobDetail() *quartz.JobDetail { - return m.Detail -} - -func (m *MockScheduledJob) Trigger() quartz.Trigger { - return quartz.NewSimpleTrigger(m.TriggerDuration) -} -func (m *MockScheduledJob) NextRunTime() int64 { - return 0 -} diff --git a/pkg/git/mocks/scheduler_mock.go b/pkg/git/mocks/scheduler_mock.go deleted file mode 100644 index 02b797af1d..0000000000 --- a/pkg/git/mocks/scheduler_mock.go +++ /dev/null @@ -1,188 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/reugn/go-quartz/quartz (interfaces: Scheduler) - -// Package mocks is a generated GoMock package. -package mocks - -import ( - context "context" - gomock "go.uber.org/mock/gomock" - quartz "github.com/reugn/go-quartz/quartz" - reflect "reflect" -) - -// MockScheduler is a mock of Scheduler interface. -type MockScheduler struct { - ctrl *gomock.Controller - recorder *MockSchedulerMockRecorder -} - -// MockSchedulerMockRecorder is the mock recorder for MockScheduler. -type MockSchedulerMockRecorder struct { - mock *MockScheduler -} - -// NewMockScheduler creates a new mock instance. -func NewMockScheduler(ctrl *gomock.Controller) *MockScheduler { - mock := &MockScheduler{ctrl: ctrl} - mock.recorder = &MockSchedulerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockScheduler) EXPECT() *MockSchedulerMockRecorder { - return m.recorder -} - -// Clear mocks base method. -func (m *MockScheduler) Clear() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Clear") - ret0, _ := ret[0].(error) - return ret0 -} - -// Clear indicates an expected call of Clear. -func (mr *MockSchedulerMockRecorder) Clear() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Clear", reflect.TypeOf((*MockScheduler)(nil).Clear)) -} - -// DeleteJob mocks base method. -func (m *MockScheduler) DeleteJob(arg0 *quartz.JobKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteJob", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteJob indicates an expected call of DeleteJob. -func (mr *MockSchedulerMockRecorder) DeleteJob(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteJob", reflect.TypeOf((*MockScheduler)(nil).DeleteJob), arg0) -} - -// GetJobKeys mocks base method. -func (m *MockScheduler) GetJobKeys(arg0 ...quartz.Matcher[quartz.ScheduledJob]) []*quartz.JobKey { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range arg0 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "GetJobKeys", varargs...) - ret0, _ := ret[0].([]*quartz.JobKey) - return ret0 -} - -// GetJobKeys indicates an expected call of GetJobKeys. -func (mr *MockSchedulerMockRecorder) GetJobKeys(arg0 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetJobKeys", reflect.TypeOf((*MockScheduler)(nil).GetJobKeys), arg0...) -} - -// GetScheduledJob mocks base method. -func (m *MockScheduler) GetScheduledJob(arg0 *quartz.JobKey) (quartz.ScheduledJob, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetScheduledJob", arg0) - ret0, _ := ret[0].(quartz.ScheduledJob) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetScheduledJob indicates an expected call of GetScheduledJob. -func (mr *MockSchedulerMockRecorder) GetScheduledJob(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetScheduledJob", reflect.TypeOf((*MockScheduler)(nil).GetScheduledJob), arg0) -} - -// IsStarted mocks base method. -func (m *MockScheduler) IsStarted() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsStarted") - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsStarted indicates an expected call of IsStarted. -func (mr *MockSchedulerMockRecorder) IsStarted() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsStarted", reflect.TypeOf((*MockScheduler)(nil).IsStarted)) -} - -// PauseJob mocks base method. -func (m *MockScheduler) PauseJob(arg0 *quartz.JobKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PauseJob", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// PauseJob indicates an expected call of PauseJob. -func (mr *MockSchedulerMockRecorder) PauseJob(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PauseJob", reflect.TypeOf((*MockScheduler)(nil).PauseJob), arg0) -} - -// ResumeJob mocks base method. -func (m *MockScheduler) ResumeJob(arg0 *quartz.JobKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ResumeJob", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// ResumeJob indicates an expected call of ResumeJob. -func (mr *MockSchedulerMockRecorder) ResumeJob(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResumeJob", reflect.TypeOf((*MockScheduler)(nil).ResumeJob), arg0) -} - -// ScheduleJob mocks base method. -func (m *MockScheduler) ScheduleJob(arg0 *quartz.JobDetail, arg1 quartz.Trigger) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ScheduleJob", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// ScheduleJob indicates an expected call of ScheduleJob. -func (mr *MockSchedulerMockRecorder) ScheduleJob(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleJob", reflect.TypeOf((*MockScheduler)(nil).ScheduleJob), arg0, arg1) -} - -// Start mocks base method. -func (m *MockScheduler) Start(arg0 context.Context) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start", arg0) -} - -// Start indicates an expected call of Start. -func (mr *MockSchedulerMockRecorder) Start(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockScheduler)(nil).Start), arg0) -} - -// Stop mocks base method. -func (m *MockScheduler) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop. -func (mr *MockSchedulerMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockScheduler)(nil).Stop)) -} - -// Wait mocks base method. -func (m *MockScheduler) Wait(arg0 context.Context) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Wait", arg0) -} - -// Wait indicates an expected call of Wait. -func (mr *MockSchedulerMockRecorder) Wait(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Wait", reflect.TypeOf((*MockScheduler)(nil).Wait), arg0) -} diff --git a/pkg/git/poll/gitrepopolljob.go b/pkg/git/poll/gitrepopolljob.go deleted file mode 100644 index 0bc31f2439..0000000000 --- a/pkg/git/poll/gitrepopolljob.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) 2021-2024 SUSE LLC -package poll - -import ( - "context" - "fmt" - - "github.com/rancher/fleet/internal/cmd/controller/grutil" - v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/reugn/go-quartz/quartz" - "golang.org/x/sync/semaphore" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" - - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -var _ quartz.Job = &GitRepoPollJob{} - -type GitFetcher interface { - LatestCommit(ctx context.Context, gitrepo *v1alpha1.GitRepo, client client.Client) (string, error) -} - -type GitRepoPollJob struct { - sem *semaphore.Weighted - client client.Client - GitRepo v1alpha1.GitRepo - fetcher GitFetcher -} - -func GitRepoPollKey(gitRepo v1alpha1.GitRepo) *quartz.JobKey { - return quartz.NewJobKeyWithGroup(gitRepo.Name, gitRepo.Namespace) -} - -func NewGitRepoPollJob(c client.Client, f GitFetcher, gitRepo v1alpha1.GitRepo) *GitRepoPollJob { - return &GitRepoPollJob{ - sem: semaphore.NewWeighted(1), - client: c, - GitRepo: gitRepo, - fetcher: f, - } -} - -func (j *GitRepoPollJob) Execute(ctx context.Context) error { - if !j.sem.TryAcquire(1) { - // already running - return nil - } - defer j.sem.Release(1) - - j.fetchLatestCommitAndUpdateStatus(ctx) - - return nil -} - -func (j *GitRepoPollJob) Description() string { - return j.String() -} - -func (j *GitRepoPollJob) String() string { - return fmt.Sprintf("gitrepo-%s-%s", j.GitRepo.Namespace, j.GitRepo.Name) -} - -func (j *GitRepoPollJob) fetchLatestCommitAndUpdateStatus(ctx context.Context) { - logger := ctrl.Log.WithName("git-latest-commit-poll-watch") - commit, err := j.fetcher.LatestCommit(ctx, &j.GitRepo, j.client) - if err != nil { - logger.Error(err, "error fetching commit", "gitrepo", j.GitRepo) - nsName := types.NamespacedName{Name: j.GitRepo.Name, Namespace: j.GitRepo.Namespace} - _ = grutil.UpdateErrorStatus(ctx, j.client, nsName, j.GitRepo.Status, err) - return - } - if j.GitRepo.Status.Commit != commit { - logger.Info("new commit found", "gitrepo", j.GitRepo, "commit", commit) - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - var gitRepoFromCluster v1alpha1.GitRepo - err := j.client.Get(ctx, types.NamespacedName{Name: j.GitRepo.Name, Namespace: j.GitRepo.Namespace}, &gitRepoFromCluster) - if err != nil { - return err - } - gitRepoFromCluster.Status.Commit = commit - - return j.client.Status().Update(ctx, &gitRepoFromCluster) - }); err != nil { - logger.Error(err, "error updating status when a new commit was found by polling", "gitrepo", j.GitRepo) - } - } -} diff --git a/pkg/git/poll/gitrepopolljob_test.go b/pkg/git/poll/gitrepopolljob_test.go deleted file mode 100644 index 59bdbe5e74..0000000000 --- a/pkg/git/poll/gitrepopolljob_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package poll - -import ( - "context" - "fmt" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "go.uber.org/mock/gomock" - "golang.org/x/sync/semaphore" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - // . "github.com/onsi/gomega" - v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/git/mocks" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" -) - -var _ = Describe("GitRepoPollJob tests", func() { - var ( - expectedCalls func(fetcher *mocks.MockGitFetcher) - gitRepo v1alpha1.GitRepo - job GitRepoPollJob - fetcher *mocks.MockGitFetcher - client client.WithWatch - ctx context.Context - commit string - ) - - JustBeforeEach(func() { - ctrl := gomock.NewController(GinkgoT()) - - gitRepo = v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitrepo", - }, - } - fetcher = mocks.NewMockGitFetcher(ctrl) - scheme := runtime.NewScheme() - err := v1alpha1.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - client = fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(&gitRepo).WithStatusSubresource(&gitRepo).Build() - ctx = context.TODO() - - job = GitRepoPollJob{ - sem: semaphore.NewWeighted(1), - client: client, - GitRepo: gitRepo, - fetcher: fetcher, - } - - expectedCalls(fetcher) - - err = job.Execute(ctx) - Expect(err).ToNot(HaveOccurred()) - }) - - When("Running the job with a commit different to the one set in the gitRepo", func() { - BeforeEach(func() { - commit = "fakeCommit" - expectedCalls = func(fetcher *mocks.MockGitFetcher) { - fetcher.EXPECT().LatestCommit(ctx, gomock.Any(), client).Return(commit, nil).Times(1) - } - gitRepo.Status.Commit = "9b0380f535d4c428d5b18f2efb5fddfe52b9dbf1" - }) - It("Should update the gitRepo commit", func() { - updatedGitRepo := v1alpha1.GitRepo{} - err := client.Get(ctx, types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace}, &updatedGitRepo) - Expect(err).ToNot(HaveOccurred()) - Expect(updatedGitRepo.Status.Commit).To(Equal(commit)) - }) - }) - When("Running the job and LatestCommit returns an error", func() { - BeforeEach(func() { - commit = "fakeCommit" - expectedCalls = func(fetcher *mocks.MockGitFetcher) { - fetcher.EXPECT().LatestCommit(ctx, gomock.Any(), client).Return("", fmt.Errorf("Some error")).Times(1) - } - gitRepo.Status.Commit = "9b0380f535d4c428d5b18f2efb5fddfe52b9dbf1" - }) - It("Should not update the original gitRepo commit", func() { - updatedGitRepo := v1alpha1.GitRepo{} - err := client.Get(ctx, types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace}, &updatedGitRepo) - Expect(err).ToNot(HaveOccurred()) - Expect(updatedGitRepo.Status.Commit).To(Equal(gitRepo.Status.Commit)) - errorFound := false - for _, c := range updatedGitRepo.Status.Conditions { - if c.Message == "Some error" { - errorFound = true - } - } - Expect(errorFound).To(BeTrue()) - }) - }) -}) diff --git a/pkg/git/poll/handler.go b/pkg/git/poll/handler.go deleted file mode 100644 index f94a2e79e5..0000000000 --- a/pkg/git/poll/handler.go +++ /dev/null @@ -1,131 +0,0 @@ -package poll - -import ( - "context" - stderrors "errors" - "time" - - v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/durations" - "github.com/rancher/fleet/pkg/git" - "github.com/reugn/go-quartz/quartz" - - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -const ( - maxSchedulerOperationRetries = 3 - defaultSyncInterval = 15 * time.Second -) - -type Watcher interface { - StartBackgroundSync(ctx context.Context) - Finish() - Restart(ctx context.Context) - UpdateGitRepo(gitRepo v1alpha1.GitRepo) - GetSyncInterval() float64 -} - -// Handler handles all the watches for the git repositories. These watches are pulling the latest commit every syncPeriod. -type Handler struct { - client client.Client - log logr.Logger - scheduler quartz.Scheduler - fetcher GitFetcher -} - -func NewHandler(ctx context.Context, client client.Client) *Handler { - scheduler := quartz.NewStdScheduler() - scheduler.Start(ctx) - return &Handler{ - client: client, - log: ctrl.Log.WithName("git-latest-commit-poll-handler"), - scheduler: scheduler, - fetcher: &git.Fetch{}, - } -} - -// AddOrModifyGitRepoPollJob adds a new scheduled job for the gitrepo if no job was already present. -// It updates the existing job for this gitrepo if present. -func (h *Handler) AddOrModifyGitRepoPollJob(ctx context.Context, gitRepo v1alpha1.GitRepo) { - gitRepoPollKey := GitRepoPollKey(gitRepo) - scheduledJob, err := h.scheduler.GetScheduledJob(gitRepoPollKey) - if err != nil { - // job was not found - if gitRepo.Spec.DisablePolling { - // nothing to do if disablePolling is set - return - } - h.scheduleJob(ctx, gitRepoPollKey, gitRepo, true) - } else { - if gitRepo.Spec.DisablePolling { - // if polling is disabled, just delete the job from the scheduler - err = h.scheduler.DeleteJob(gitRepoPollKey) - if err != nil { - h.log.Error(err, "error deleting the job", "job", gitRepoPollKey) - } - return - } - job := scheduledJob.JobDetail().Job() - gitRepoPollJob, ok := job.(*GitRepoPollJob) - if !ok { - h.log.Error(stderrors.New("invalid job"), - "error getting Gitrepo poll job, the scheduled job is not a GitRepoPollJob", "job", job.Description()) - return - } - previousInterval := gitRepoPollJob.GitRepo.Spec.PollingInterval - previousGeneration := gitRepoPollJob.GitRepo.Generation - gitRepoPollJob.GitRepo = gitRepo - if (previousGeneration != gitRepo.Generation) || - !durations.Equal(gitRepo.Spec.PollingInterval, previousInterval) { - // Spec or polling interval changed - // Reschedule so the job is immediately executed - // (otherwise it'll wait until next timeout) - _ = h.scheduler.DeleteJob(gitRepoPollKey) - h.scheduleJob(ctx, gitRepoPollKey, gitRepo, true) - } - } -} - -// CleanUpGitRepoPollJobs removes all poll jobs whose gitrepo is not present in the cluster. -func (h *Handler) CleanUpGitRepoPollJobs(ctx context.Context) { - var gitRepo v1alpha1.GitRepo - for _, key := range h.scheduler.GetJobKeys() { - namespacedName := types.NamespacedName{ - Namespace: key.Group(), - Name: key.Name(), - } - if err := h.client.Get(ctx, namespacedName, &gitRepo); errors.IsNotFound(err) { - err = h.scheduler.DeleteJob(key) - if err != nil { - h.log.Error(err, "error deleting job", "job", key) - } - } - } -} - -func calculateSyncInterval(gitRepo v1alpha1.GitRepo) time.Duration { - if gitRepo.Spec.PollingInterval != nil { - return gitRepo.Spec.PollingInterval.Duration - } - - return defaultSyncInterval -} - -func (h *Handler) scheduleJob(ctx context.Context, jobKey *quartz.JobKey, gitRepo v1alpha1.GitRepo, runBefore bool) { - job := NewGitRepoPollJob(h.client, h.fetcher, gitRepo) - if runBefore { - // ignoring error because that is only used by the quartz library to implement its retries mechanism - // The GitRepoPollJob always returns nil - _ = job.Execute(ctx) - } - err := h.scheduler.ScheduleJob(quartz.NewJobDetail(job, jobKey), - quartz.NewSimpleTrigger(calculateSyncInterval(gitRepo))) - if err != nil { - h.log.Error(err, "error scheduling job", "job", jobKey) - } -} diff --git a/pkg/git/poll/handler_test.go b/pkg/git/poll/handler_test.go deleted file mode 100644 index d1889fff9d..0000000000 --- a/pkg/git/poll/handler_test.go +++ /dev/null @@ -1,304 +0,0 @@ -package poll - -import ( - "context" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/reugn/go-quartz/quartz" - gomock "go.uber.org/mock/gomock" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/git/mocks" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" -) - -var _ = Describe("Gitrepo pooling tests", func() { - var ( - scheduler *mocks.MockScheduler - fetcher *mocks.MockGitFetcher - gitRepo v1alpha1.GitRepo - client client.Client - ) - BeforeEach(func() { - ctrl := gomock.NewController(GinkgoT()) - scheduler = mocks.NewMockScheduler(ctrl) - fetcher = mocks.NewMockGitFetcher(ctrl) - gitRepo = v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob", - Namespace: "test", - }, - } - scheme := runtime.NewScheme() - err := v1alpha1.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - client = fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(&gitRepo).WithStatusSubresource(&gitRepo).Build() - }) - DescribeTable("Gitrepo pooling tests", - func(pollingInterval time.Duration, disablePolling bool, changeSpec bool, - schedulerCalls func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration), - fetcherCalls func()) { - if pollingInterval != 0 { - gitRepo.Spec.PollingInterval = &metav1.Duration{Duration: pollingInterval} - } else { - gitRepo.Spec.PollingInterval = nil - } - gitRepo.Spec.DisablePolling = disablePolling - if changeSpec { - gitRepo.Generation = 2 - } - - handler := Handler{ - scheduler: scheduler, - fetcher: fetcher, - client: client, - } - - schedulerCalls(gitRepo, pollingInterval) - fetcherCalls() - - handler.AddOrModifyGitRepoPollJob(context.TODO(), gitRepo) - }, - - Entry("GitRepo is not present and disablePolling = true", - 1*time.Second, true, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - scheduler.EXPECT().GetScheduledJob(key).Return(nil, quartz.ErrJobNotFound).Times(1) - }, func() {}), - - Entry("Gitrepo is not present, not setting pollingInterval and disablePolling=false", - 0*time.Second, false, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - scheduler.EXPECT().GetScheduledJob(key).Return(nil, quartz.ErrJobNotFound).Times(1) - - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - // we're not specifying an internal, so the default (15 secs) is set - trigger := quartz.NewSimpleTrigger(15 * time.Second) - scheduler.EXPECT().ScheduleJob(jobDetail, trigger).Return(nil).Times(1) - }, func() { - fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Return("commit", nil).Times(1) - }, - ), - - Entry("Gitrepo is not present, setting pollingInterval to a specific value and disablePolling=false", - 1999*time.Second, false, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - scheduler.EXPECT().GetScheduledJob(key).Return(nil, quartz.ErrJobNotFound).Times(1) - - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - // trigger should be pollingInterval seconds - trigger := quartz.NewSimpleTrigger(pollingInterval) - scheduler.EXPECT().ScheduleJob(jobDetail, trigger).Return(nil).Times(1) - }, func() { - fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Return("commit", nil).Times(1) - }, - ), - - Entry("gitrepo present, same polling interval, disablePolling=true", - 10*time.Second, true, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // job exists - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - // but we delete it because disablePolling is true - scheduler.EXPECT().DeleteJob(key).Return(nil).Times(1) - }, func() {}, - ), - - Entry("gitrepo present, different polling interval, disablePolling=true", - 10*time.Second, true, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // job exists with a polling interval of 10 seconds - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - // we delete it because disablePolling is true - scheduler.EXPECT().DeleteJob(key).Return(nil).Times(1) - }, func() {}, - ), - - Entry("gitrepo present, same polling interval, disablePolling=false", - 10*time.Second, false, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // gets the job and does nothing else - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - }, func() {}, - ), - - Entry("gitrepo present, different polling interval, disablePolling=false", - 1999*time.Second, false, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - gitRepoCopy := gitRepo - gitRepoCopy.Spec.PollingInterval = &metav1.Duration{Duration: 10 * time.Second} - key := GitRepoPollKey(gitRepoCopy) - job := NewGitRepoPollJob(client, fetcher, gitRepoCopy) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // gets the job and does nothing else - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - scheduler.EXPECT().DeleteJob(key).Return(nil).Times(1) - trigger := quartz.NewSimpleTrigger(1999 * time.Second) - scheduler.EXPECT().ScheduleJob(jobDetail, trigger).Return(nil).Times(1) - }, func() { - fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Return("commit", nil).Times(1) - }, - ), - - Entry("gitrepo present, same polling interval, disablePolling=false, generation changed", - 10*time.Second, false, true, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - gitRepoCopy := gitRepo - gitRepoCopy.Generation = 1 - key := GitRepoPollKey(gitRepoCopy) - job := NewGitRepoPollJob(client, fetcher, gitRepoCopy) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // gets the job and rechedules - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - scheduler.EXPECT().DeleteJob(key).Return(nil).Times(1) - trigger := quartz.NewSimpleTrigger(10 * time.Second) - scheduler.EXPECT().ScheduleJob(jobDetail, trigger).Return(nil).Times(1) - }, func() { - fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Return("commit", nil).Times(1) - }, - ), - ) -}) - -var _ = Describe("Gitrepo cleanup tests", func() { - var ( - scheduler *mocks.MockScheduler - fetcher *mocks.MockGitFetcher - gitRepo v1alpha1.GitRepo - client client.Client - handler Handler - expectedSchedulerCalls func(sched *mocks.MockScheduler) - ) - JustBeforeEach(func() { - ctrl := gomock.NewController(GinkgoT()) - scheduler = mocks.NewMockScheduler(ctrl) - fetcher = mocks.NewMockGitFetcher(ctrl) - handler = Handler{ - scheduler: scheduler, - fetcher: fetcher, - client: client, - } - }) - When("All gitRepos are found", func() { - BeforeEach(func() { - scheme := runtime.NewScheme() - err := v1alpha1.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - gitRepo1 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob1", - Namespace: "test", - }, - } - - gitRepo2 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob2", - Namespace: "test", - }, - } - - gitRepo3 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob3", - Namespace: "test", - }, - } - client = fake.NewClientBuilder(). - WithScheme(scheme). - WithRuntimeObjects(&gitRepo1, &gitRepo2, &gitRepo3). - WithStatusSubresource(&gitRepo, &gitRepo2, &gitRepo3). - Build() - - expectedSchedulerCalls = func(sched *mocks.MockScheduler) { - key1 := GitRepoPollKey(gitRepo1) - key2 := GitRepoPollKey(gitRepo2) - key3 := GitRepoPollKey(gitRepo3) - sched.EXPECT().GetJobKeys().Return([]*quartz.JobKey{key1, key2, key3}).Times(1) - } - - }) - It("Does nothing", func() { - expectedSchedulerCalls(scheduler) - ctx := context.TODO() - handler.CleanUpGitRepoPollJobs(ctx) - }) - }) - When("A gitRepo is not found", func() { - BeforeEach(func() { - scheme := runtime.NewScheme() - err := v1alpha1.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - gitRepo1 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob1", - Namespace: "test", - }, - } - - gitRepo2 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob2", - Namespace: "test", - }, - } - - gitRepo3 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob3", - Namespace: "test", - }, - } - client = fake.NewClientBuilder(). - WithScheme(scheme). - WithRuntimeObjects(&gitRepo1, &gitRepo2). - WithStatusSubresource(&gitRepo, &gitRepo2). - Build() - - expectedSchedulerCalls = func(sched *mocks.MockScheduler) { - key1 := GitRepoPollKey(gitRepo1) - key2 := GitRepoPollKey(gitRepo2) - // gitRepo3 was not added to the client - key3 := GitRepoPollKey(gitRepo3) - sched.EXPECT().GetJobKeys().Return([]*quartz.JobKey{key1, key2, key3}).Times(1) - sched.EXPECT().DeleteJob(key3).Return(nil).Times(1) - } - }) - It("Deletes the job that was not found from the schedule", func() { - expectedSchedulerCalls(scheduler) - ctx := context.TODO() - handler.CleanUpGitRepoPollJobs(ctx) - }) - }) -}) diff --git a/pkg/git/poll/suite_test.go b/pkg/git/poll/suite_test.go deleted file mode 100644 index 246c07f050..0000000000 --- a/pkg/git/poll/suite_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package poll_test - -import ( - "testing" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -const ( - timeout = 30 * time.Second -) - -func TestFleet(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Gitrepo pooling Suite") -} - -var _ = BeforeSuite(func() { - SetDefaultEventuallyTimeout(timeout) -})