diff --git a/charts/fleet-crd/templates/crds.yaml b/charts/fleet-crd/templates/crds.yaml index 5b9062781f..1f3776b13a 100644 --- a/charts/fleet-crd/templates/crds.yaml +++ b/charts/fleet-crd/templates/crds.yaml @@ -6490,6 +6490,11 @@ spec: description: GitJobStatus is the status of the last Git job run, e.g. "Current" if there was no error. type: string + lastPollingTriggered: + description: LastPollingTime is the last time the polling check + was triggered + format: date-time + type: string lastSyncedImageScanTime: description: LastSyncedImageScanTime is the time of the last image scan. diff --git a/e2e/single-cluster/gitrepo_polling_disabled_test.go b/e2e/single-cluster/gitrepo_polling_disabled_test.go index a2e67dc323..5cd9c6ebeb 100644 --- a/e2e/single-cluster/gitrepo_polling_disabled_test.go +++ b/e2e/single-cluster/gitrepo_polling_disabled_test.go @@ -85,7 +85,11 @@ var _ = Describe("GitRepoPollingDisabled", Label("infra-setup"), func() { }) JustBeforeEach(func() { - err := testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo-polling-disabled.yaml"), struct { + var err error + clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "disable_polling") + Expect(err).ToNot(HaveOccurred()) + + err = testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo-polling-disabled.yaml"), struct { Name string Repo string Branch string @@ -97,9 +101,6 @@ var _ = Describe("GitRepoPollingDisabled", Label("infra-setup"), func() { targetNamespace, }) Expect(err).ToNot(HaveOccurred()) - - clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "disable_polling") - Expect(err).ToNot(HaveOccurred()) }) It("deploys the resources initially and updates them while force updating", func() { diff --git a/e2e/single-cluster/gitrepo_test.go b/e2e/single-cluster/gitrepo_test.go index f843f09fea..26a42a4379 100644 --- a/e2e/single-cluster/gitrepo_test.go +++ b/e2e/single-cluster/gitrepo_test.go @@ -263,6 +263,10 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup" return err }).ShouldNot(HaveOccurred(), out) + // Clone previously created repo + clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "examples") + Expect(err).ToNot(HaveOccurred()) + err = testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo.yaml"), struct { Name string Repo string @@ -277,10 +281,6 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup" targetNamespace, // to avoid conflicts with other tests }) Expect(err).ToNot(HaveOccurred()) - - // Clone previously created repo - clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "examples") - Expect(err).ToNot(HaveOccurred()) }) It("updates the deployment", func() { diff --git a/integrationtests/gitjob/controller/controller_test.go b/integrationtests/gitjob/controller/controller_test.go index 7c73fe2305..7aa630a58f 100644 --- a/integrationtests/gitjob/controller/controller_test.go +++ b/integrationtests/gitjob/controller/controller_test.go @@ -13,6 +13,7 @@ import ( "github.com/rancher/fleet/integrationtests/utils" v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/wrangler/v3/pkg/genericcondition" "github.com/rancher/wrangler/v3/pkg/name" batchv1 "k8s.io/api/batch/v1" @@ -32,6 +33,23 @@ const ( stableCommit = "26bdd9326b0238bb2fb743f863d9380c3c5d43e0" ) +func getCondition(gitrepo *v1alpha1.GitRepo, condType string) (genericcondition.GenericCondition, bool) { + for _, cond := range gitrepo.Status.Conditions { + if cond.Type == condType { + return cond, true + } + } + return genericcondition.GenericCondition{}, false +} + +func checkCondition(gitrepo *v1alpha1.GitRepo, condType string, status corev1.ConditionStatus) bool { + cond, found := getCondition(gitrepo, condType) + if !found { + return false + } + return cond.Type == condType && cond.Status == status +} + var _ = Describe("GitJob controller", func() { When("a new GitRepo is created", func() { @@ -43,6 +61,7 @@ var _ = Describe("GitJob controller", func() { ) JustBeforeEach(func() { + expectedCommit = commit gitRepo = createGitRepo(gitRepoName) Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred()) Eventually(func() string { @@ -54,7 +73,6 @@ var _ = Describe("GitJob controller", func() { } return gitRepoFromCluster.Status.Display.ReadyBundleDeployments }).Should(ContainSubstring("0/0")) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) By("Creating a job") Eventually(func() error { @@ -146,8 +164,8 @@ var _ = Describe("GitJob controller", func() { Eventually(func() bool { Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).ToNot(HaveOccurred()) - // XXX: do we need an additional `LastExecutedCommit` or similar status field? - return gitRepo.Status.GitJobStatus == "Failed" + // Job Status should be failed and commit should be as expected + return gitRepo.Status.GitJobStatus == "Failed" && gitRepo.Status.Commit == commit }).Should(BeTrue()) By("verifying that the job is deleted if Spec.Generation changed") @@ -168,9 +186,9 @@ var _ = Describe("GitJob controller", func() { ) JustBeforeEach(func() { + expectedCommit = commit gitRepo = createGitRepo(gitRepoName) Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred()) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) By("creating a Job") Eventually(func() error { @@ -185,7 +203,7 @@ var _ = Describe("GitJob controller", func() { }) It("creates a new Job", func() { const newCommit = "9ca3a0adbbba32" - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, newCommit)).ToNot(HaveOccurred()) + expectedCommit = newCommit Eventually(func() error { jobName := name.SafeConcatName(gitRepoName, name.Hex(repo+newCommit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) @@ -206,7 +224,6 @@ var _ = Describe("GitJob controller", func() { JustBeforeEach(func() { gitRepo = createGitRepo(gitRepoName) Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred()) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) Eventually(func() error { jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) @@ -216,6 +233,7 @@ var _ = Describe("GitJob controller", func() { Expect(simulateIncreaseForceSyncGeneration(gitRepo)).ToNot(HaveOccurred()) }) BeforeEach(func() { + expectedCommit = commit gitRepoName = "force-deletion" }) AfterEach(func() { @@ -254,7 +272,6 @@ var _ = Describe("GitJob controller", func() { JustBeforeEach(func() { gitRepo = createGitRepo(gitRepoName) Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred()) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) Eventually(func() error { jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5)) return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) @@ -274,6 +291,7 @@ var _ = Describe("GitJob controller", func() { }) BeforeEach(func() { + expectedCommit = commit gitRepoName = "simulate-arg-update" }) @@ -307,6 +325,7 @@ var _ = Describe("GitJob controller", func() { When("a job completes successfully", func() { BeforeEach(func() { + expectedCommit = stableCommit gitRepoName = "disable-polling" }) @@ -338,6 +357,7 @@ var _ = Describe("GitJob controller", func() { ) JustBeforeEach(func() { + expectedCommit = commit gitRepoName = "test-no-for-paths-secret" gitRepo = createGitRepo(gitRepoName) gitRepo.Spec.HelmSecretNameForPaths = helmSecretNameForPaths @@ -345,7 +365,6 @@ var _ = Describe("GitJob controller", func() { // Create should return an error err := k8sClient.Create(ctx, &gitRepo) Expect(err).ToNot(HaveOccurred()) - Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred()) }) AfterEach(func() { @@ -473,28 +492,25 @@ var _ = Describe("GitRepo", func() { When("creating a gitrepo", func() { JustBeforeEach(func() { + expectedCommit = commit err := k8sClient.Create(ctx, gitrepo) Expect(err).NotTo(HaveOccurred()) }) It("updates the gitrepo status", func() { org := gitrepo.ResourceVersion - Eventually(func() bool { + Eventually(func(g Gomega) { _ = k8sClient.Get(ctx, types.NamespacedName{Name: gitrepoName, Namespace: namespace}, gitrepo) - return gitrepo.ResourceVersion > org && - gitrepo.Status.Display.ReadyBundleDeployments == "0/0" && - !gitrepo.Status.Display.Error && - len(gitrepo.Status.Conditions) == 4 && - gitrepo.Status.Conditions[0].Type == "Reconciling" && - string(gitrepo.Status.Conditions[0].Status) == "False" && - gitrepo.Status.Conditions[1].Type == "Stalled" && - string(gitrepo.Status.Conditions[1].Status) == "False" && - gitrepo.Status.Conditions[2].Type == "Ready" && - string(gitrepo.Status.Conditions[2].Status) == "True" && - gitrepo.Status.Conditions[3].Type == "Accepted" && - string(gitrepo.Status.Conditions[3].Status) == "True" && - gitrepo.Status.DeepCopy().ObservedGeneration == int64(1) - }).Should(BeTrue()) + g.Expect(gitrepo.ResourceVersion > org).To(BeTrue()) + g.Expect(gitrepo.Status.Display.ReadyBundleDeployments).To(Equal("0/0")) + g.Expect(gitrepo.Status.Display.Error).To(BeFalse()) + g.Expect(len(gitrepo.Status.Conditions)).To(Equal(5)) + g.Expect(checkCondition(gitrepo, "GitPolling", corev1.ConditionTrue)).To(BeTrue()) + g.Expect(checkCondition(gitrepo, "Reconciling", corev1.ConditionTrue)).To(BeTrue()) + g.Expect(checkCondition(gitrepo, "Stalled", corev1.ConditionFalse)).To(BeTrue()) + g.Expect(checkCondition(gitrepo, "Ready", corev1.ConditionTrue)).To(BeTrue()) + g.Expect(checkCondition(gitrepo, "Accepted", corev1.ConditionTrue)).To(BeTrue()) + }).Should(Succeed()) }) }) }) @@ -619,20 +635,6 @@ func simulateIncreaseGitRepoGeneration(gitRepo v1alpha1.GitRepo) error { }) } -func simulateGitPollerUpdatingCommitInStatus(gitRepo v1alpha1.GitRepo, commit string) error { - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - var gitRepoFromCluster v1alpha1.GitRepo - err := k8sClient.Get(ctx, types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace}, &gitRepoFromCluster) - if err != nil { - return err - } - gitRepoFromCluster.Status = v1alpha1.GitRepoStatus{ - Commit: commit, - } - return k8sClient.Status().Update(ctx, &gitRepoFromCluster) - }) -} - func createGitRepo(gitRepoName string) v1alpha1.GitRepo { return v1alpha1.GitRepo{ ObjectMeta: metav1.ObjectMeta{ diff --git a/integrationtests/gitjob/controller/suite_test.go b/integrationtests/gitjob/controller/suite_test.go index 328a48b72b..310f5e91b6 100644 --- a/integrationtests/gitjob/controller/suite_test.go +++ b/integrationtests/gitjob/controller/suite_test.go @@ -17,8 +17,8 @@ import ( ctrlreconciler "github.com/rancher/fleet/internal/cmd/controller/reconciler" "github.com/rancher/fleet/internal/cmd/controller/target" "github.com/rancher/fleet/internal/manifest" - "github.com/rancher/fleet/internal/mocks" v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/fleet/pkg/git/mocks" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -33,13 +33,14 @@ const ( ) var ( - cfg *rest.Config - testEnv *envtest.Environment - ctx context.Context - cancel context.CancelFunc - k8sClient client.Client - logsBuffer bytes.Buffer - namespace string + cfg *rest.Config + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc + k8sClient client.Client + logsBuffer bytes.Buffer + namespace string + expectedCommit string ) func TestGitJobController(t *testing.T) { @@ -73,25 +74,29 @@ var _ = BeforeSuite(func() { Expect(err).ToNot(HaveOccurred()) ctlr := gomock.NewController(GinkgoT()) - gitPollerMock := mocks.NewMockGitPoller(ctlr) // redirect logs to a buffer that we can read in the tests GinkgoWriter.TeeTo(&logsBuffer) ctrl.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - // do nothing if gitPoller is called. gitPoller calls are tested in unit tests - gitPollerMock.EXPECT().AddOrModifyGitRepoPollJob(gomock.Any(), gomock.Any()).AnyTimes() - gitPollerMock.EXPECT().CleanUpGitRepoPollJobs(gomock.Any()).AnyTimes() + // return whatever commit the test is expecting + fetcherMock := mocks.NewMockGitFetcher(ctlr) + fetcherMock.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn( + func(ctx context.Context, gitrepo *v1alpha1.GitRepo, client client.Client) (string, error) { + return expectedCommit, nil + }, + ) sched := quartz.NewStdScheduler() Expect(sched).ToNot(BeNil()) err = (&reconciler.GitJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Image: "image", - GitPoller: gitPollerMock, - Scheduler: sched, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Image: "image", + Scheduler: sched, + GitFetcher: fetcherMock, + Clock: reconciler.RealClock{}, }).SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/cmd/controller/gitops/operator.go b/internal/cmd/controller/gitops/operator.go index b22d04f155..1a11d9021a 100644 --- a/internal/cmd/controller/gitops/operator.go +++ b/internal/cmd/controller/gitops/operator.go @@ -12,7 +12,7 @@ import ( "github.com/rancher/fleet/internal/cmd/controller/gitops/reconciler" "github.com/rancher/fleet/internal/metrics" fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/git/poll" + "github.com/rancher/fleet/pkg/git" "github.com/rancher/fleet/pkg/version" "github.com/rancher/fleet/pkg/webhook" "github.com/reugn/go-quartz/quartz" @@ -122,13 +122,14 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error { } reconciler := &reconciler.GitJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Image: g.Image, - GitPoller: poll.NewHandler(ctx, mgr.GetClient()), - Scheduler: sched, - Workers: workers, - ShardID: g.ShardID, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Image: g.Image, + Scheduler: sched, + Workers: workers, + ShardID: g.ShardID, + GitFetcher: &git.Fetch{}, + Clock: reconciler.RealClock{}, } group, ctx := errgroup.WithContext(ctx) diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index d288ea0b96..1a3902cce6 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -15,8 +15,8 @@ import ( "github.com/rancher/fleet/internal/metrics" "github.com/rancher/fleet/internal/ociwrapper" v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/git" "github.com/rancher/fleet/pkg/sharding" + "github.com/rancher/wrangler/v3/pkg/condition" "github.com/reugn/go-quartz/quartz" "github.com/rancher/wrangler/v3/pkg/name" @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( @@ -48,24 +49,38 @@ const ( gitClonerVolumeName = "git-cloner" emptyDirVolumeName = "git-cloner-empty-dir" fleetHomeDir = "/fleet-home" + + defaultPollingSyncInterval = 15 * time.Second + gitPollingCondition = "GitPolling" ) var two = int32(2) -type GitPoller interface { - AddOrModifyGitRepoPollJob(ctx context.Context, gitRepo v1alpha1.GitRepo) - CleanUpGitRepoPollJobs(ctx context.Context) +type GitFetcher interface { + LatestCommit(ctx context.Context, gitrepo *v1alpha1.GitRepo, client client.Client) (string, error) +} + +// TimeGetter interface is used to mock the time.Now() call in unit tests +type TimeGetter interface { + Now() time.Time + Since(t time.Time) time.Duration } +type RealClock struct{} + +func (RealClock) Now() time.Time { return time.Now() } +func (RealClock) Since(t time.Time) time.Duration { return time.Since(t) } + // CronJobReconciler reconciles a GitRepo resource to create a git cloning k8s job type GitJobReconciler struct { client.Client - Scheme *runtime.Scheme - Image string - GitPoller GitPoller - Scheduler quartz.Scheduler - Workers int - ShardID string + Scheme *runtime.Scheme + Image string + Scheduler quartz.Scheduler + Workers int + ShardID string + GitFetcher GitFetcher + Clock TimeGetter } func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -77,7 +92,6 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error { predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}, predicate.LabelChangedPredicate{}, - commitChangedPredicate(), ), ), ). @@ -122,7 +136,6 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, err } else if errors.IsNotFound(err) { logger.V(1).Info("Gitrepo deleted, cleaning up poll jobs") - r.GitPoller.CleanUpGitRepoPollJobs(ctx) return ctrl.Result{}, nil } @@ -148,6 +161,9 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if client.IgnoreNotFound(err) != nil { return ctrl.Result{}, err } + + // requeue as adding the finalizer changes the spec + return ctrl.Result{Requeue: true}, nil } logger = logger.WithValues("generation", gitrepo.Generation, "commit", gitrepo.Status.Commit) @@ -159,7 +175,10 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, nil } - r.GitPoller.AddOrModifyGitRepoPollJob(ctx, *gitrepo) + gitPollerWasExecuted, _ := r.checkPollingTask(ctx, gitrepo) + // From this point onwards we have to take into account if the poller + // task was executed. + // If so, we need to return a Result with EnqueueAfter set. var job batchv1.Job err = r.Get(ctx, types.NamespacedName{ @@ -167,38 +186,37 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr Name: jobName(gitrepo), }, &job) if err != nil && !errors.IsNotFound(err) { - return ctrl.Result{}, fmt.Errorf("error retrieving git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("error retrieving git job: %w", err) } // Gitjob handling if errors.IsNotFound(err) { if gitrepo.Spec.DisablePolling { - if err := r.updateCommit(ctx, gitrepo); err != nil { - if errors.IsConflict(err) { - logger.V(1).Info("conflict updating commit, retrying", "message", err) - return ctrl.Result{Requeue: true}, nil // just retry, but don't show an error - } - return ctrl.Result{}, fmt.Errorf("error updating commit: %v", err) + commit, err := r.GitFetcher.LatestCommit(ctx, gitrepo, r.Client) + condition.Cond(gitPollingCondition).SetError(&gitrepo.Status, "", err) + if err == nil && commit != "" { + gitrepo.Status.Commit = commit } } + if gitrepo.Status.Commit != "" { if err := r.validateExternalSecretExist(ctx, gitrepo); err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } logger.V(1).Info("Creating Git job resources") if err := r.createJobRBAC(ctx, gitrepo); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to create RBAC resources for git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("failed to create RBAC resources for git job: %w", err) } if err := r.createTargetsConfigMap(ctx, gitrepo); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to create targets config map for git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("failed to create targets config map for git job: %w", err) } if err := r.createJob(ctx, gitrepo); err != nil { - return ctrl.Result{}, fmt.Errorf("error creating git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("error creating git job: %w", err) } } } else if gitrepo.Status.Commit != "" { if err = r.deleteJobIfNeeded(ctx, gitrepo, &job); err != nil { - return ctrl.Result{}, fmt.Errorf("error deleting git job: %w", err) + return reconcileResult(gitPollerWasExecuted, gitrepo), fmt.Errorf("error deleting git job: %w", err) } } @@ -206,21 +224,21 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Refresh the status if err = grutil.SetStatusFromGitjob(ctx, r.Client, gitrepo, &job); err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } err = grutil.SetStatusFromBundleDeployments(ctx, r.Client, gitrepo) if err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } err = grutil.SetStatusFromBundles(ctx, r.Client, gitrepo) if err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } if err = grutil.UpdateDisplayState(gitrepo); err != nil { - return ctrl.Result{}, grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) + return reconcileResult(gitPollerWasExecuted, gitrepo), grutil.UpdateErrorStatus(ctx, r.Client, req.NamespacedName, gitrepo.Status, err) } grutil.SetStatusFromResourceKey(ctx, r.Client, gitrepo) @@ -235,22 +253,10 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if err != nil { logger.V(1).Error(err, "Reconcile failed final update to git repo status", "status", gitrepo.Status) - return ctrl.Result{}, err + return reconcileResult(gitPollerWasExecuted, gitrepo), err } - return ctrl.Result{}, nil -} - -func (r *GitJobReconciler) updateCommit(ctx context.Context, gitRepo *v1alpha1.GitRepo) error { - fetcher := git.NewFetch() - commit, err := fetcher.LatestCommit(ctx, gitRepo, r.Client) - if err != nil { - return err - } - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - gitRepo.Status.Commit = commit - return r.Status().Update(ctx, gitRepo) - }) + return reconcileResult(gitPollerWasExecuted, gitrepo), nil } func (r *GitJobReconciler) cleanupGitRepo(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo) error { @@ -307,23 +313,6 @@ func (r *GitJobReconciler) addGitRepoFinalizer(ctx context.Context, nsName types return nil } -func commitChangedPredicate() predicate.Predicate { - return predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - oldGitJob, ok := e.ObjectOld.(*v1alpha1.GitRepo) - if !ok { - return true - } - newGitJob, ok := e.ObjectNew.(*v1alpha1.GitRepo) - if !ok { - return true - } - - return oldGitJob.Status.Commit != newGitJob.Status.Commit - }, - } -} - func (r *GitJobReconciler) createJobRBAC(ctx context.Context, gitrepo *v1alpha1.GitRepo) error { // No update needed, values are the same. So we ignore AlreadyExists. saName := name.SafeConcatName("git", gitrepo.Name) @@ -989,3 +978,53 @@ func bundleStatusChangedPredicate() predicate.Funcs { }, } } + +func (r *GitJobReconciler) checkPollingTask(ctx context.Context, gitrepo *v1alpha1.GitRepo) (bool, error) { + if gitrepo.Spec.DisablePolling { + return false, nil + } + if r.shouldRunPollingTask(gitrepo) { + gitrepo.Status.LastPollingTime.Time = r.Clock.Now() + commit, err := r.GitFetcher.LatestCommit(ctx, gitrepo, r.Client) + condition.Cond(gitPollingCondition).SetError(&gitrepo.Status, "", err) + if err != nil { + return true, err + } + gitrepo.Status.Commit = commit + + return true, nil + } + + return false, nil +} + +func (r *GitJobReconciler) shouldRunPollingTask(gitrepo *v1alpha1.GitRepo) bool { + if gitrepo.Spec.DisablePolling { + return false + } + + t := gitrepo.Status.LastPollingTime + + if t.IsZero() || (r.Clock.Since(t.Time) >= getPollingIntervalDuration(gitrepo)) { + return true + } + if gitrepo.Status.ObservedGeneration != gitrepo.Generation { + return true + } + return false +} + +func getPollingIntervalDuration(gitrepo *v1alpha1.GitRepo) time.Duration { + if gitrepo.Spec.PollingInterval == nil || gitrepo.Spec.PollingInterval.Duration == 0 { + return defaultPollingSyncInterval + } + + return gitrepo.Spec.PollingInterval.Duration +} + +func reconcileResult(gitPollerWasExecuted bool, gitrepo *v1alpha1.GitRepo) reconcile.Result { + if gitPollerWasExecuted { + return reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)} + } + return reconcile.Result{} +} diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_test.go b/internal/cmd/controller/gitops/reconciler/gitjob_test.go index 3423929b82..1d367a1b0b 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_test.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_test.go @@ -5,29 +5,102 @@ package reconciler import ( "context" + "fmt" "os" "testing" + "time" "github.com/google/go-cmp/cmp" "go.uber.org/mock/gomock" + "github.com/rancher/fleet/internal/cmd/controller/finalize" "github.com/rancher/fleet/internal/mocks" fleetv1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/wrangler/v3/pkg/genericcondition" + gitmocks "github.com/rancher/fleet/pkg/git/mocks" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -func TestReconcile_AddOrModifyGitRepoPollJobIsCalled_WhenGitRepoIsCreatedOrModified(t *testing.T) { +type ClockMock struct { + t time.Time +} + +func (m ClockMock) Now() time.Time { return m.t } +func (m ClockMock) Since(t time.Time) time.Duration { return m.t.Sub(t) } + +func getGitPollingCondition(gitrepo *fleetv1.GitRepo) (genericcondition.GenericCondition, bool) { + for _, cond := range gitrepo.Status.Conditions { + if cond.Type == gitPollingCondition { + return cond, true + } + } + return genericcondition.GenericCondition{}, false +} + +func TestReconcile_ReturnsAndRequeuesAfterAddingFinalizer(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + scheme := runtime.NewScheme() + utilruntime.Must(batchv1.AddToScheme(scheme)) + gitRepo := fleetv1.GitRepo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitrepo", + Namespace: "default", + }, + } + namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} + client := mocks.NewMockClient(mockCtrl) + client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( + func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { + gitrepo.Name = gitRepo.Name + gitrepo.Namespace = gitRepo.Namespace + gitrepo.Spec.Repo = "repo" + return nil + }, + ) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + client.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + // check that we added the finalizer + if !controllerutil.ContainsFinalizer(repo, finalize.GitRepoFinalizer) { + t.Errorf("expecting gitrepo to contain finalizer") + } + }, + ).Times(1) + + r := GitJobReconciler{ + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, + } + + ctx := context.TODO() + + // second call is the one calling LatestCommit + res, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) + if err != nil { + t.Errorf("unexpected error %v", err) + } + if !res.Requeue { + t.Errorf("expecting Requeue set to true, it was false") + } +} + +func TestReconcile_LatestCommitErrorIsSetInConditions(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() scheme := runtime.NewScheme() @@ -41,56 +114,251 @@ func TestReconcile_AddOrModifyGitRepoPollJobIsCalled_WhenGitRepoIsCreatedOrModif namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} client := mocks.NewMockClient(mockCtrl) statusClient := mocks.NewMockSubResourceWriter(mockCtrl) - statusClient.EXPECT().Update(gomock.Any(), gomock.Any()) - client.EXPECT().Update(gomock.Any(), gomock.Any()).Times(1).Return(nil) client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { gitrepo.Name = gitRepo.Name gitrepo.Namespace = gitRepo.Namespace gitrepo.Spec.Repo = "repo" + controllerutil.AddFinalizer(gitrepo, finalize.GitRepoFinalizer) return nil }, ) client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) - client.EXPECT().Status().Return(statusClient) - poller := mocks.NewMockGitPoller(mockCtrl) - poller.EXPECT().AddOrModifyGitRepoPollJob(gomock.Any(), gomock.Any()).Times(1) - poller.EXPECT().CleanUpGitRepoPollJobs(gomock.Any()).Times(0) + client.EXPECT().Status().Return(statusClient).Times(1) + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return("", fmt.Errorf("TEST ERROR")) + statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + cond, found := getGitPollingCondition(repo) + if !found { + t.Errorf("expecting Condition %s to be found", gitPollingCondition) + } + if cond.Message != "TEST ERROR" { + t.Errorf("expecting condition message [TEST ERROR], got [%s]", cond.Message) + } + if cond.Type != gitPollingCondition { + t.Errorf("expecting condition type [%s], got [%s]", gitPollingCondition, cond.Type) + } + if cond.Status != "False" { + t.Errorf("expecting condition Status [False], got [%s]", cond.Type) + } + }, + ).Times(1) r := GitJobReconciler{ - Client: client, - Scheme: scheme, - Image: "", - GitPoller: poller, + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, } ctx := context.TODO() + + // second call is the one calling LatestCommit _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) if err != nil { t.Errorf("unexpected error %v", err) } } -func TestReconcile_PurgeWatchesIsCalled_WhenGitRepoIsCreatedOrModified(t *testing.T) { +func TestReconcile_LatestCommitIsOkay(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() scheme := runtime.NewScheme() utilruntime.Must(batchv1.AddToScheme(scheme)) + gitRepo := fleetv1.GitRepo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitrepo", + Namespace: "default", + }, + } + namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} + client := mocks.NewMockClient(mockCtrl) + statusClient := mocks.NewMockSubResourceWriter(mockCtrl) + client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( + func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { + gitrepo.Name = gitRepo.Name + gitrepo.Namespace = gitRepo.Namespace + gitrepo.Spec.Repo = "repo" + controllerutil.AddFinalizer(gitrepo, finalize.GitRepoFinalizer) + return nil + }, + ) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Status().Return(statusClient).Times(1) + + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + commit := "1883fd54bc5dfd225acf02aecbb6cb8020458e33" + fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(commit, nil) + statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + cond, found := getGitPollingCondition(repo) + if !found { + t.Errorf("expecting Condition %s to be found", gitPollingCondition) + } + if cond.Message != "" { + t.Errorf("expecting condition message empty, got [%s]", cond.Message) + } + if cond.Type != gitPollingCondition { + t.Errorf("expecting condition type [%s], got [%s]", gitPollingCondition, cond.Type) + } + if cond.Status != "True" { + t.Errorf("expecting condition Status [True], got [%s]", cond.Type) + } + if repo.Status.Commit != commit { + t.Errorf("expecting commit %s, got %s", commit, repo.Status.Commit) + } + }, + ).Times(1) + + r := GitJobReconciler{ + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, + } + ctx := context.TODO() - namespacedName := types.NamespacedName{Name: "gitRepo", Namespace: "default"} + + // second call is the one calling LatestCommit + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) + if err != nil { + t.Errorf("unexpected error %v", err) + } +} + +func TestReconcile_LatestCommitNotCalledYet(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + scheme := runtime.NewScheme() + utilruntime.Must(batchv1.AddToScheme(scheme)) + gitRepo := fleetv1.GitRepo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitrepo", + Namespace: "default", + }, + } + namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} client := mocks.NewMockClient(mockCtrl) - client.EXPECT().Get(ctx, namespacedName, gomock.Any()).Times(1).Return(errors.NewNotFound(schema.GroupResource{}, "NotFound")) - poller := mocks.NewMockGitPoller(mockCtrl) - poller.EXPECT().AddOrModifyGitRepoPollJob(ctx, gomock.Any()).Times(0) - poller.EXPECT().CleanUpGitRepoPollJobs(ctx).Times(1) + statusClient := mocks.NewMockSubResourceWriter(mockCtrl) + client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( + func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { + gitrepo.Name = gitRepo.Name + gitrepo.Namespace = gitRepo.Namespace + gitrepo.Spec.Repo = "repo" + controllerutil.AddFinalizer(gitrepo, finalize.GitRepoFinalizer) + + // set last polling time to now... + // default gitrepo polling time is 15 seconds, so it won't call LatestCommit this time + gitrepo.Status.LastPollingTime.Time = time.Now() + return nil + }, + ) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Status().Return(statusClient).Times(1) + statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + if repo.Status.Commit != "" { + t.Errorf("expecting gitrepo empty commit, got [%s]", repo.Status.Commit) + } + cond, found := getGitPollingCondition(repo) + if found { + t.Errorf("not expecting Condition %s to be found. Got [%s]", gitPollingCondition, cond) + } + }, + ).Times(1) + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) r := GitJobReconciler{ - Client: client, - Scheme: scheme, - Image: "", - GitPoller: poller, + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, } + + ctx := context.TODO() + + // second call is the one calling LatestCommit + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) + if err != nil { + t.Errorf("unexpected error %v", err) + } +} + +func TestReconcile_LatestCommitShouldBeCalled(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + scheme := runtime.NewScheme() + utilruntime.Must(batchv1.AddToScheme(scheme)) + gitRepo := fleetv1.GitRepo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gitrepo", + Namespace: "default", + }, + } + namespacedName := types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace} + client := mocks.NewMockClient(mockCtrl) + statusClient := mocks.NewMockSubResourceWriter(mockCtrl) + client.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + commit := "1883fd54bc5dfd225acf02aecbb6cb8020458e33" + fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(commit, nil) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn( + func(ctx context.Context, req types.NamespacedName, gitrepo *fleetv1.GitRepo, opts ...interface{}) error { + gitrepo.Name = gitRepo.Name + gitrepo.Namespace = gitRepo.Namespace + gitrepo.Spec.Repo = "repo" + controllerutil.AddFinalizer(gitrepo, finalize.GitRepoFinalizer) + + // set last polling time to now less 15 seconds (which is the default) + // that should trigger the polling job now + now := time.Now() + gitrepo.Status.LastPollingTime.Time = now.Add(time.Duration(-15) * time.Second) + // commit is something different to what we expect after this reconcile + gitrepo.Status.Commit = "dd45c7ad68e10307765104fea4a1f5997643020f" + return nil + }, + ) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + client.EXPECT().Status().Return(statusClient).Times(1) + statusClient.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(ctx context.Context, repo *fleetv1.GitRepo, opts ...interface{}) { + cond, found := getGitPollingCondition(repo) + if !found { + t.Errorf("expecting Condition %s to be found", gitPollingCondition) + } + if cond.Message != "" { + t.Errorf("expecting condition message empty, got [%s]", cond.Message) + } + if cond.Type != gitPollingCondition { + t.Errorf("expecting condition type [%s], got [%s]", gitPollingCondition, cond.Type) + } + if cond.Status != "True" { + t.Errorf("expecting condition Status [True], got [%s]", cond.Type) + } + if repo.Status.Commit != commit { + t.Errorf("expecting commit %s, got %s", commit, repo.Status.Commit) + } + }, + ).Times(1) + + r := GitJobReconciler{ + Client: client, + Scheme: scheme, + Image: "", + GitFetcher: fetcher, + Clock: RealClock{}, + } + + ctx := context.TODO() + + // second call is the one calling LatestCommit _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: namespacedName}) if err != nil { t.Errorf("unexpected error %v", err) @@ -132,13 +400,12 @@ func TestReconcile_Error_WhenGitrepoRestrictionsAreNotMet(t *testing.T) { } }, ) - poller := mocks.NewMockGitPoller(mockCtrl) r := GitJobReconciler{ - Client: mockClient, - Scheme: scheme, - Image: "", - GitPoller: poller, + Client: mockClient, + Scheme: scheme, + Image: "", + Clock: RealClock{}, } ctx := context.TODO() @@ -167,9 +434,6 @@ func TestNewJob(t *testing.T) { // nolint:funlen scheme := runtime.NewScheme() utilruntime.Must(batchv1.AddToScheme(scheme)) ctx := context.TODO() - poller := mocks.NewMockGitPoller(mockCtrl) - poller.EXPECT().AddOrModifyGitRepoPollJob(ctx, gomock.Any()).AnyTimes() - poller.EXPECT().CleanUpGitRepoPollJobs(ctx).AnyTimes() tests := map[string]struct { gitrepo *fleetv1.GitRepo @@ -555,10 +819,10 @@ func TestNewJob(t *testing.T) { // nolint:funlen for name, test := range tests { t.Run(name, func(t *testing.T) { r := GitJobReconciler{ - Client: test.client, - Scheme: scheme, - Image: "test", - GitPoller: poller, + Client: test.client, + Scheme: scheme, + Image: "test", + Clock: RealClock{}, } job, err := r.newGitJob(ctx, test.gitrepo) if err != nil { @@ -663,9 +927,9 @@ func TestGenerateJob_EnvVars(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { r := GitJobReconciler{ - Client: fake.NewFakeClient(), - Image: "test", - GitPoller: poller, + Client: fake.NewFakeClient(), + Image: "test", + Clock: RealClock{}, } for k, v := range test.osEnv { err := os.Setenv(k, v) @@ -693,6 +957,96 @@ func TestGenerateJob_EnvVars(t *testing.T) { } } +func TestCheckforPollingTask(t *testing.T) { + tests := map[string]struct { + gitrepo *fleetv1.GitRepo + timeNow time.Time + expectedResult bool + }{ + "LastPollingTime is not set": { + gitrepo: &fleetv1.GitRepo{}, + timeNow: time.Now(), // time here is irrelevant + expectedResult: true, + }, + "LastPollingTime is set but should still not trigger (1s away)": { + gitrepo: &fleetv1.GitRepo{ + Status: fleetv1.GitRepoStatus{ + LastPollingTime: metav1.Time{Time: time.Date(2024, time.July, 16, 15, 59, 59, 0, time.UTC)}, + }, + Spec: fleetv1.GitRepoSpec{ + PollingInterval: &metav1.Duration{Duration: 10 * time.Second}, + }, + }, + timeNow: time.Date(2024, time.July, 16, 16, 0, 0, 0, time.UTC), + expectedResult: false, + }, + "LastPollingTime is set and should trigger (10s away)": { + gitrepo: &fleetv1.GitRepo{ + Status: fleetv1.GitRepoStatus{ + LastPollingTime: metav1.Time{Time: time.Date(2024, time.July, 16, 15, 59, 50, 0, time.UTC)}, + }, + Spec: fleetv1.GitRepoSpec{ + PollingInterval: &metav1.Duration{Duration: 10 * time.Second}, + }, + }, + timeNow: time.Date(2024, time.July, 16, 16, 0, 0, 0, time.UTC), + expectedResult: true, + }, + "LastPollingTime is set but should still not trigger (1s away with default value)": { + gitrepo: &fleetv1.GitRepo{ + Status: fleetv1.GitRepoStatus{ + LastPollingTime: metav1.Time{Time: time.Date(2024, time.July, 16, 15, 59, 59, 0, time.UTC)}, + }, + }, + timeNow: time.Date(2024, time.July, 16, 16, 0, 0, 0, time.UTC), + expectedResult: false, + }, + "LastPollingTime is set and should trigger (15s away with default value)": { + gitrepo: &fleetv1.GitRepo{ + Status: fleetv1.GitRepoStatus{ + LastPollingTime: metav1.Time{Time: time.Date(2024, time.July, 16, 15, 59, 45, 0, time.UTC)}, + }, + }, + timeNow: time.Date(2024, time.July, 16, 16, 0, 0, 0, time.UTC), + expectedResult: true, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + fetcher := gitmocks.NewMockGitFetcher(mockCtrl) + commit := "1883fd54bc5dfd225acf02aecbb6cb8020458e33" + if test.expectedResult { + fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(commit, nil) + } + r := GitJobReconciler{ + Client: fake.NewFakeClient(), + Image: "test", + Clock: ClockMock{t: test.timeNow}, + GitFetcher: fetcher, + } + res, err := r.checkPollingTask(context.TODO(), test.gitrepo) + if res != test.expectedResult { + t.Errorf("unexpected result. Expecting %t, got %t", test.expectedResult, res) + } + if err != nil { + t.Errorf("not expecting to get an error, got [%v]", err) + } + if res { + // if the task was called, commit will be applied + if test.gitrepo.Status.Commit != commit { + t.Errorf("expecting commit: %s, got: %s", commit, test.gitrepo.Status.Commit) + } + // also LastPollingTime should be set to now + if test.gitrepo.Status.LastPollingTime.Time != test.timeNow { + t.Errorf("expecting LastPollingTime to be: %s, got: %s", test.timeNow, test.gitrepo.Status.LastPollingTime.Time) + } + } + }) + } +} + func httpSecretMock() client.Client { scheme := runtime.NewScheme() utilruntime.Must(corev1.AddToScheme(scheme)) diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go index d314345859..bbd3142e32 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go @@ -193,6 +193,8 @@ type GitRepoStatus struct { ResourceErrors []string `json:"resourceErrors,omitempty"` // LastSyncedImageScanTime is the time of the last image scan. LastSyncedImageScanTime metav1.Time `json:"lastSyncedImageScanTime,omitempty"` + // LastPollingTime is the last time the polling check was triggered + LastPollingTime metav1.Time `json:"lastPollingTriggered,omitempty"` } // GitRepoResourceCounts contains the number of resources in each state. diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go index 94071846ed..fe60d80726 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go @@ -1552,6 +1552,7 @@ func (in *GitRepoStatus) DeepCopyInto(out *GitRepoStatus) { copy(*out, *in) } in.LastSyncedImageScanTime.DeepCopyInto(&out.LastSyncedImageScanTime) + in.LastPollingTime.DeepCopyInto(&out.LastPollingTime) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GitRepoStatus. diff --git a/pkg/git/mocks/scheduledjob_mock.go b/pkg/git/mocks/scheduledjob_mock.go deleted file mode 100644 index d43cda0f79..0000000000 --- a/pkg/git/mocks/scheduledjob_mock.go +++ /dev/null @@ -1,23 +0,0 @@ -package mocks - -import ( - "time" - - quartz "github.com/reugn/go-quartz/quartz" -) - -type MockScheduledJob struct { - Detail *quartz.JobDetail - TriggerDuration time.Duration -} - -func (m *MockScheduledJob) JobDetail() *quartz.JobDetail { - return m.Detail -} - -func (m *MockScheduledJob) Trigger() quartz.Trigger { - return quartz.NewSimpleTrigger(m.TriggerDuration) -} -func (m *MockScheduledJob) NextRunTime() int64 { - return 0 -} diff --git a/pkg/git/mocks/scheduler_mock.go b/pkg/git/mocks/scheduler_mock.go deleted file mode 100644 index 02b797af1d..0000000000 --- a/pkg/git/mocks/scheduler_mock.go +++ /dev/null @@ -1,188 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/reugn/go-quartz/quartz (interfaces: Scheduler) - -// Package mocks is a generated GoMock package. -package mocks - -import ( - context "context" - gomock "go.uber.org/mock/gomock" - quartz "github.com/reugn/go-quartz/quartz" - reflect "reflect" -) - -// MockScheduler is a mock of Scheduler interface. -type MockScheduler struct { - ctrl *gomock.Controller - recorder *MockSchedulerMockRecorder -} - -// MockSchedulerMockRecorder is the mock recorder for MockScheduler. -type MockSchedulerMockRecorder struct { - mock *MockScheduler -} - -// NewMockScheduler creates a new mock instance. -func NewMockScheduler(ctrl *gomock.Controller) *MockScheduler { - mock := &MockScheduler{ctrl: ctrl} - mock.recorder = &MockSchedulerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockScheduler) EXPECT() *MockSchedulerMockRecorder { - return m.recorder -} - -// Clear mocks base method. -func (m *MockScheduler) Clear() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Clear") - ret0, _ := ret[0].(error) - return ret0 -} - -// Clear indicates an expected call of Clear. -func (mr *MockSchedulerMockRecorder) Clear() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Clear", reflect.TypeOf((*MockScheduler)(nil).Clear)) -} - -// DeleteJob mocks base method. -func (m *MockScheduler) DeleteJob(arg0 *quartz.JobKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteJob", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteJob indicates an expected call of DeleteJob. -func (mr *MockSchedulerMockRecorder) DeleteJob(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteJob", reflect.TypeOf((*MockScheduler)(nil).DeleteJob), arg0) -} - -// GetJobKeys mocks base method. -func (m *MockScheduler) GetJobKeys(arg0 ...quartz.Matcher[quartz.ScheduledJob]) []*quartz.JobKey { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range arg0 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "GetJobKeys", varargs...) - ret0, _ := ret[0].([]*quartz.JobKey) - return ret0 -} - -// GetJobKeys indicates an expected call of GetJobKeys. -func (mr *MockSchedulerMockRecorder) GetJobKeys(arg0 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetJobKeys", reflect.TypeOf((*MockScheduler)(nil).GetJobKeys), arg0...) -} - -// GetScheduledJob mocks base method. -func (m *MockScheduler) GetScheduledJob(arg0 *quartz.JobKey) (quartz.ScheduledJob, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetScheduledJob", arg0) - ret0, _ := ret[0].(quartz.ScheduledJob) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetScheduledJob indicates an expected call of GetScheduledJob. -func (mr *MockSchedulerMockRecorder) GetScheduledJob(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetScheduledJob", reflect.TypeOf((*MockScheduler)(nil).GetScheduledJob), arg0) -} - -// IsStarted mocks base method. -func (m *MockScheduler) IsStarted() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsStarted") - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsStarted indicates an expected call of IsStarted. -func (mr *MockSchedulerMockRecorder) IsStarted() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsStarted", reflect.TypeOf((*MockScheduler)(nil).IsStarted)) -} - -// PauseJob mocks base method. -func (m *MockScheduler) PauseJob(arg0 *quartz.JobKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PauseJob", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// PauseJob indicates an expected call of PauseJob. -func (mr *MockSchedulerMockRecorder) PauseJob(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PauseJob", reflect.TypeOf((*MockScheduler)(nil).PauseJob), arg0) -} - -// ResumeJob mocks base method. -func (m *MockScheduler) ResumeJob(arg0 *quartz.JobKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ResumeJob", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// ResumeJob indicates an expected call of ResumeJob. -func (mr *MockSchedulerMockRecorder) ResumeJob(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResumeJob", reflect.TypeOf((*MockScheduler)(nil).ResumeJob), arg0) -} - -// ScheduleJob mocks base method. -func (m *MockScheduler) ScheduleJob(arg0 *quartz.JobDetail, arg1 quartz.Trigger) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ScheduleJob", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// ScheduleJob indicates an expected call of ScheduleJob. -func (mr *MockSchedulerMockRecorder) ScheduleJob(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleJob", reflect.TypeOf((*MockScheduler)(nil).ScheduleJob), arg0, arg1) -} - -// Start mocks base method. -func (m *MockScheduler) Start(arg0 context.Context) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start", arg0) -} - -// Start indicates an expected call of Start. -func (mr *MockSchedulerMockRecorder) Start(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockScheduler)(nil).Start), arg0) -} - -// Stop mocks base method. -func (m *MockScheduler) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop. -func (mr *MockSchedulerMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockScheduler)(nil).Stop)) -} - -// Wait mocks base method. -func (m *MockScheduler) Wait(arg0 context.Context) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Wait", arg0) -} - -// Wait indicates an expected call of Wait. -func (mr *MockSchedulerMockRecorder) Wait(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Wait", reflect.TypeOf((*MockScheduler)(nil).Wait), arg0) -} diff --git a/pkg/git/poll/gitrepopolljob.go b/pkg/git/poll/gitrepopolljob.go deleted file mode 100644 index 0bc31f2439..0000000000 --- a/pkg/git/poll/gitrepopolljob.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) 2021-2024 SUSE LLC -package poll - -import ( - "context" - "fmt" - - "github.com/rancher/fleet/internal/cmd/controller/grutil" - v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/reugn/go-quartz/quartz" - "golang.org/x/sync/semaphore" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" - - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -var _ quartz.Job = &GitRepoPollJob{} - -type GitFetcher interface { - LatestCommit(ctx context.Context, gitrepo *v1alpha1.GitRepo, client client.Client) (string, error) -} - -type GitRepoPollJob struct { - sem *semaphore.Weighted - client client.Client - GitRepo v1alpha1.GitRepo - fetcher GitFetcher -} - -func GitRepoPollKey(gitRepo v1alpha1.GitRepo) *quartz.JobKey { - return quartz.NewJobKeyWithGroup(gitRepo.Name, gitRepo.Namespace) -} - -func NewGitRepoPollJob(c client.Client, f GitFetcher, gitRepo v1alpha1.GitRepo) *GitRepoPollJob { - return &GitRepoPollJob{ - sem: semaphore.NewWeighted(1), - client: c, - GitRepo: gitRepo, - fetcher: f, - } -} - -func (j *GitRepoPollJob) Execute(ctx context.Context) error { - if !j.sem.TryAcquire(1) { - // already running - return nil - } - defer j.sem.Release(1) - - j.fetchLatestCommitAndUpdateStatus(ctx) - - return nil -} - -func (j *GitRepoPollJob) Description() string { - return j.String() -} - -func (j *GitRepoPollJob) String() string { - return fmt.Sprintf("gitrepo-%s-%s", j.GitRepo.Namespace, j.GitRepo.Name) -} - -func (j *GitRepoPollJob) fetchLatestCommitAndUpdateStatus(ctx context.Context) { - logger := ctrl.Log.WithName("git-latest-commit-poll-watch") - commit, err := j.fetcher.LatestCommit(ctx, &j.GitRepo, j.client) - if err != nil { - logger.Error(err, "error fetching commit", "gitrepo", j.GitRepo) - nsName := types.NamespacedName{Name: j.GitRepo.Name, Namespace: j.GitRepo.Namespace} - _ = grutil.UpdateErrorStatus(ctx, j.client, nsName, j.GitRepo.Status, err) - return - } - if j.GitRepo.Status.Commit != commit { - logger.Info("new commit found", "gitrepo", j.GitRepo, "commit", commit) - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - var gitRepoFromCluster v1alpha1.GitRepo - err := j.client.Get(ctx, types.NamespacedName{Name: j.GitRepo.Name, Namespace: j.GitRepo.Namespace}, &gitRepoFromCluster) - if err != nil { - return err - } - gitRepoFromCluster.Status.Commit = commit - - return j.client.Status().Update(ctx, &gitRepoFromCluster) - }); err != nil { - logger.Error(err, "error updating status when a new commit was found by polling", "gitrepo", j.GitRepo) - } - } -} diff --git a/pkg/git/poll/gitrepopolljob_test.go b/pkg/git/poll/gitrepopolljob_test.go deleted file mode 100644 index 59bdbe5e74..0000000000 --- a/pkg/git/poll/gitrepopolljob_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package poll - -import ( - "context" - "fmt" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "go.uber.org/mock/gomock" - "golang.org/x/sync/semaphore" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - // . "github.com/onsi/gomega" - v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/git/mocks" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" -) - -var _ = Describe("GitRepoPollJob tests", func() { - var ( - expectedCalls func(fetcher *mocks.MockGitFetcher) - gitRepo v1alpha1.GitRepo - job GitRepoPollJob - fetcher *mocks.MockGitFetcher - client client.WithWatch - ctx context.Context - commit string - ) - - JustBeforeEach(func() { - ctrl := gomock.NewController(GinkgoT()) - - gitRepo = v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitrepo", - }, - } - fetcher = mocks.NewMockGitFetcher(ctrl) - scheme := runtime.NewScheme() - err := v1alpha1.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - client = fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(&gitRepo).WithStatusSubresource(&gitRepo).Build() - ctx = context.TODO() - - job = GitRepoPollJob{ - sem: semaphore.NewWeighted(1), - client: client, - GitRepo: gitRepo, - fetcher: fetcher, - } - - expectedCalls(fetcher) - - err = job.Execute(ctx) - Expect(err).ToNot(HaveOccurred()) - }) - - When("Running the job with a commit different to the one set in the gitRepo", func() { - BeforeEach(func() { - commit = "fakeCommit" - expectedCalls = func(fetcher *mocks.MockGitFetcher) { - fetcher.EXPECT().LatestCommit(ctx, gomock.Any(), client).Return(commit, nil).Times(1) - } - gitRepo.Status.Commit = "9b0380f535d4c428d5b18f2efb5fddfe52b9dbf1" - }) - It("Should update the gitRepo commit", func() { - updatedGitRepo := v1alpha1.GitRepo{} - err := client.Get(ctx, types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace}, &updatedGitRepo) - Expect(err).ToNot(HaveOccurred()) - Expect(updatedGitRepo.Status.Commit).To(Equal(commit)) - }) - }) - When("Running the job and LatestCommit returns an error", func() { - BeforeEach(func() { - commit = "fakeCommit" - expectedCalls = func(fetcher *mocks.MockGitFetcher) { - fetcher.EXPECT().LatestCommit(ctx, gomock.Any(), client).Return("", fmt.Errorf("Some error")).Times(1) - } - gitRepo.Status.Commit = "9b0380f535d4c428d5b18f2efb5fddfe52b9dbf1" - }) - It("Should not update the original gitRepo commit", func() { - updatedGitRepo := v1alpha1.GitRepo{} - err := client.Get(ctx, types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace}, &updatedGitRepo) - Expect(err).ToNot(HaveOccurred()) - Expect(updatedGitRepo.Status.Commit).To(Equal(gitRepo.Status.Commit)) - errorFound := false - for _, c := range updatedGitRepo.Status.Conditions { - if c.Message == "Some error" { - errorFound = true - } - } - Expect(errorFound).To(BeTrue()) - }) - }) -}) diff --git a/pkg/git/poll/handler.go b/pkg/git/poll/handler.go deleted file mode 100644 index f94a2e79e5..0000000000 --- a/pkg/git/poll/handler.go +++ /dev/null @@ -1,131 +0,0 @@ -package poll - -import ( - "context" - stderrors "errors" - "time" - - v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/durations" - "github.com/rancher/fleet/pkg/git" - "github.com/reugn/go-quartz/quartz" - - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -const ( - maxSchedulerOperationRetries = 3 - defaultSyncInterval = 15 * time.Second -) - -type Watcher interface { - StartBackgroundSync(ctx context.Context) - Finish() - Restart(ctx context.Context) - UpdateGitRepo(gitRepo v1alpha1.GitRepo) - GetSyncInterval() float64 -} - -// Handler handles all the watches for the git repositories. These watches are pulling the latest commit every syncPeriod. -type Handler struct { - client client.Client - log logr.Logger - scheduler quartz.Scheduler - fetcher GitFetcher -} - -func NewHandler(ctx context.Context, client client.Client) *Handler { - scheduler := quartz.NewStdScheduler() - scheduler.Start(ctx) - return &Handler{ - client: client, - log: ctrl.Log.WithName("git-latest-commit-poll-handler"), - scheduler: scheduler, - fetcher: &git.Fetch{}, - } -} - -// AddOrModifyGitRepoPollJob adds a new scheduled job for the gitrepo if no job was already present. -// It updates the existing job for this gitrepo if present. -func (h *Handler) AddOrModifyGitRepoPollJob(ctx context.Context, gitRepo v1alpha1.GitRepo) { - gitRepoPollKey := GitRepoPollKey(gitRepo) - scheduledJob, err := h.scheduler.GetScheduledJob(gitRepoPollKey) - if err != nil { - // job was not found - if gitRepo.Spec.DisablePolling { - // nothing to do if disablePolling is set - return - } - h.scheduleJob(ctx, gitRepoPollKey, gitRepo, true) - } else { - if gitRepo.Spec.DisablePolling { - // if polling is disabled, just delete the job from the scheduler - err = h.scheduler.DeleteJob(gitRepoPollKey) - if err != nil { - h.log.Error(err, "error deleting the job", "job", gitRepoPollKey) - } - return - } - job := scheduledJob.JobDetail().Job() - gitRepoPollJob, ok := job.(*GitRepoPollJob) - if !ok { - h.log.Error(stderrors.New("invalid job"), - "error getting Gitrepo poll job, the scheduled job is not a GitRepoPollJob", "job", job.Description()) - return - } - previousInterval := gitRepoPollJob.GitRepo.Spec.PollingInterval - previousGeneration := gitRepoPollJob.GitRepo.Generation - gitRepoPollJob.GitRepo = gitRepo - if (previousGeneration != gitRepo.Generation) || - !durations.Equal(gitRepo.Spec.PollingInterval, previousInterval) { - // Spec or polling interval changed - // Reschedule so the job is immediately executed - // (otherwise it'll wait until next timeout) - _ = h.scheduler.DeleteJob(gitRepoPollKey) - h.scheduleJob(ctx, gitRepoPollKey, gitRepo, true) - } - } -} - -// CleanUpGitRepoPollJobs removes all poll jobs whose gitrepo is not present in the cluster. -func (h *Handler) CleanUpGitRepoPollJobs(ctx context.Context) { - var gitRepo v1alpha1.GitRepo - for _, key := range h.scheduler.GetJobKeys() { - namespacedName := types.NamespacedName{ - Namespace: key.Group(), - Name: key.Name(), - } - if err := h.client.Get(ctx, namespacedName, &gitRepo); errors.IsNotFound(err) { - err = h.scheduler.DeleteJob(key) - if err != nil { - h.log.Error(err, "error deleting job", "job", key) - } - } - } -} - -func calculateSyncInterval(gitRepo v1alpha1.GitRepo) time.Duration { - if gitRepo.Spec.PollingInterval != nil { - return gitRepo.Spec.PollingInterval.Duration - } - - return defaultSyncInterval -} - -func (h *Handler) scheduleJob(ctx context.Context, jobKey *quartz.JobKey, gitRepo v1alpha1.GitRepo, runBefore bool) { - job := NewGitRepoPollJob(h.client, h.fetcher, gitRepo) - if runBefore { - // ignoring error because that is only used by the quartz library to implement its retries mechanism - // The GitRepoPollJob always returns nil - _ = job.Execute(ctx) - } - err := h.scheduler.ScheduleJob(quartz.NewJobDetail(job, jobKey), - quartz.NewSimpleTrigger(calculateSyncInterval(gitRepo))) - if err != nil { - h.log.Error(err, "error scheduling job", "job", jobKey) - } -} diff --git a/pkg/git/poll/handler_test.go b/pkg/git/poll/handler_test.go deleted file mode 100644 index d1889fff9d..0000000000 --- a/pkg/git/poll/handler_test.go +++ /dev/null @@ -1,304 +0,0 @@ -package poll - -import ( - "context" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/reugn/go-quartz/quartz" - gomock "go.uber.org/mock/gomock" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/fleet/pkg/git/mocks" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" -) - -var _ = Describe("Gitrepo pooling tests", func() { - var ( - scheduler *mocks.MockScheduler - fetcher *mocks.MockGitFetcher - gitRepo v1alpha1.GitRepo - client client.Client - ) - BeforeEach(func() { - ctrl := gomock.NewController(GinkgoT()) - scheduler = mocks.NewMockScheduler(ctrl) - fetcher = mocks.NewMockGitFetcher(ctrl) - gitRepo = v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob", - Namespace: "test", - }, - } - scheme := runtime.NewScheme() - err := v1alpha1.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - client = fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(&gitRepo).WithStatusSubresource(&gitRepo).Build() - }) - DescribeTable("Gitrepo pooling tests", - func(pollingInterval time.Duration, disablePolling bool, changeSpec bool, - schedulerCalls func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration), - fetcherCalls func()) { - if pollingInterval != 0 { - gitRepo.Spec.PollingInterval = &metav1.Duration{Duration: pollingInterval} - } else { - gitRepo.Spec.PollingInterval = nil - } - gitRepo.Spec.DisablePolling = disablePolling - if changeSpec { - gitRepo.Generation = 2 - } - - handler := Handler{ - scheduler: scheduler, - fetcher: fetcher, - client: client, - } - - schedulerCalls(gitRepo, pollingInterval) - fetcherCalls() - - handler.AddOrModifyGitRepoPollJob(context.TODO(), gitRepo) - }, - - Entry("GitRepo is not present and disablePolling = true", - 1*time.Second, true, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - scheduler.EXPECT().GetScheduledJob(key).Return(nil, quartz.ErrJobNotFound).Times(1) - }, func() {}), - - Entry("Gitrepo is not present, not setting pollingInterval and disablePolling=false", - 0*time.Second, false, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - scheduler.EXPECT().GetScheduledJob(key).Return(nil, quartz.ErrJobNotFound).Times(1) - - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - // we're not specifying an internal, so the default (15 secs) is set - trigger := quartz.NewSimpleTrigger(15 * time.Second) - scheduler.EXPECT().ScheduleJob(jobDetail, trigger).Return(nil).Times(1) - }, func() { - fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Return("commit", nil).Times(1) - }, - ), - - Entry("Gitrepo is not present, setting pollingInterval to a specific value and disablePolling=false", - 1999*time.Second, false, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - scheduler.EXPECT().GetScheduledJob(key).Return(nil, quartz.ErrJobNotFound).Times(1) - - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - // trigger should be pollingInterval seconds - trigger := quartz.NewSimpleTrigger(pollingInterval) - scheduler.EXPECT().ScheduleJob(jobDetail, trigger).Return(nil).Times(1) - }, func() { - fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Return("commit", nil).Times(1) - }, - ), - - Entry("gitrepo present, same polling interval, disablePolling=true", - 10*time.Second, true, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // job exists - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - // but we delete it because disablePolling is true - scheduler.EXPECT().DeleteJob(key).Return(nil).Times(1) - }, func() {}, - ), - - Entry("gitrepo present, different polling interval, disablePolling=true", - 10*time.Second, true, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // job exists with a polling interval of 10 seconds - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - // we delete it because disablePolling is true - scheduler.EXPECT().DeleteJob(key).Return(nil).Times(1) - }, func() {}, - ), - - Entry("gitrepo present, same polling interval, disablePolling=false", - 10*time.Second, false, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - key := GitRepoPollKey(gitRepo) - job := NewGitRepoPollJob(client, fetcher, gitRepo) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // gets the job and does nothing else - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - }, func() {}, - ), - - Entry("gitrepo present, different polling interval, disablePolling=false", - 1999*time.Second, false, false, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - gitRepoCopy := gitRepo - gitRepoCopy.Spec.PollingInterval = &metav1.Duration{Duration: 10 * time.Second} - key := GitRepoPollKey(gitRepoCopy) - job := NewGitRepoPollJob(client, fetcher, gitRepoCopy) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // gets the job and does nothing else - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - scheduler.EXPECT().DeleteJob(key).Return(nil).Times(1) - trigger := quartz.NewSimpleTrigger(1999 * time.Second) - scheduler.EXPECT().ScheduleJob(jobDetail, trigger).Return(nil).Times(1) - }, func() { - fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Return("commit", nil).Times(1) - }, - ), - - Entry("gitrepo present, same polling interval, disablePolling=false, generation changed", - 10*time.Second, false, true, func(gitRepo v1alpha1.GitRepo, pollingInterval time.Duration) { - gitRepoCopy := gitRepo - gitRepoCopy.Generation = 1 - key := GitRepoPollKey(gitRepoCopy) - job := NewGitRepoPollJob(client, fetcher, gitRepoCopy) - jobDetail := quartz.NewJobDetail(job, key) - schedJob := &mocks.MockScheduledJob{ - Detail: jobDetail, - TriggerDuration: 10 * time.Second, - } - // gets the job and rechedules - scheduler.EXPECT().GetScheduledJob(key).Return(schedJob, nil).Times(1) - scheduler.EXPECT().DeleteJob(key).Return(nil).Times(1) - trigger := quartz.NewSimpleTrigger(10 * time.Second) - scheduler.EXPECT().ScheduleJob(jobDetail, trigger).Return(nil).Times(1) - }, func() { - fetcher.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).Return("commit", nil).Times(1) - }, - ), - ) -}) - -var _ = Describe("Gitrepo cleanup tests", func() { - var ( - scheduler *mocks.MockScheduler - fetcher *mocks.MockGitFetcher - gitRepo v1alpha1.GitRepo - client client.Client - handler Handler - expectedSchedulerCalls func(sched *mocks.MockScheduler) - ) - JustBeforeEach(func() { - ctrl := gomock.NewController(GinkgoT()) - scheduler = mocks.NewMockScheduler(ctrl) - fetcher = mocks.NewMockGitFetcher(ctrl) - handler = Handler{ - scheduler: scheduler, - fetcher: fetcher, - client: client, - } - }) - When("All gitRepos are found", func() { - BeforeEach(func() { - scheme := runtime.NewScheme() - err := v1alpha1.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - gitRepo1 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob1", - Namespace: "test", - }, - } - - gitRepo2 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob2", - Namespace: "test", - }, - } - - gitRepo3 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob3", - Namespace: "test", - }, - } - client = fake.NewClientBuilder(). - WithScheme(scheme). - WithRuntimeObjects(&gitRepo1, &gitRepo2, &gitRepo3). - WithStatusSubresource(&gitRepo, &gitRepo2, &gitRepo3). - Build() - - expectedSchedulerCalls = func(sched *mocks.MockScheduler) { - key1 := GitRepoPollKey(gitRepo1) - key2 := GitRepoPollKey(gitRepo2) - key3 := GitRepoPollKey(gitRepo3) - sched.EXPECT().GetJobKeys().Return([]*quartz.JobKey{key1, key2, key3}).Times(1) - } - - }) - It("Does nothing", func() { - expectedSchedulerCalls(scheduler) - ctx := context.TODO() - handler.CleanUpGitRepoPollJobs(ctx) - }) - }) - When("A gitRepo is not found", func() { - BeforeEach(func() { - scheme := runtime.NewScheme() - err := v1alpha1.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - gitRepo1 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob1", - Namespace: "test", - }, - } - - gitRepo2 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob2", - Namespace: "test", - }, - } - - gitRepo3 := v1alpha1.GitRepo{ - ObjectMeta: metav1.ObjectMeta{ - Name: "gitjob3", - Namespace: "test", - }, - } - client = fake.NewClientBuilder(). - WithScheme(scheme). - WithRuntimeObjects(&gitRepo1, &gitRepo2). - WithStatusSubresource(&gitRepo, &gitRepo2). - Build() - - expectedSchedulerCalls = func(sched *mocks.MockScheduler) { - key1 := GitRepoPollKey(gitRepo1) - key2 := GitRepoPollKey(gitRepo2) - // gitRepo3 was not added to the client - key3 := GitRepoPollKey(gitRepo3) - sched.EXPECT().GetJobKeys().Return([]*quartz.JobKey{key1, key2, key3}).Times(1) - sched.EXPECT().DeleteJob(key3).Return(nil).Times(1) - } - }) - It("Deletes the job that was not found from the schedule", func() { - expectedSchedulerCalls(scheduler) - ctx := context.TODO() - handler.CleanUpGitRepoPollJobs(ctx) - }) - }) -}) diff --git a/pkg/git/poll/suite_test.go b/pkg/git/poll/suite_test.go deleted file mode 100644 index 246c07f050..0000000000 --- a/pkg/git/poll/suite_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package poll_test - -import ( - "testing" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -const ( - timeout = 30 * time.Second -) - -func TestFleet(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Gitrepo pooling Suite") -} - -var _ = BeforeSuite(func() { - SetDefaultEventuallyTimeout(timeout) -})