Skip to content

Commit

Permalink
Use RequeueAfter for Gitpolling job.
Browse files Browse the repository at this point in the history
Uses the Manager's timers instead of creating a different go routine to deal with the polling job.

This is an approach to deal with multiple calls updating the status of a `gitrepo` in the same reconcile loop.
There were issues setting the status and getting it in the same reconcile as the cache was not properly set yet.

This tries to fix those issues by only updating the status once per reconcile.

Refers to: #2631

Signed-off-by: Xavi Garcia <[email protected]>
  • Loading branch information
0xavi0 committed Jul 17, 2024
1 parent 7144757 commit 98adea9
Show file tree
Hide file tree
Showing 17 changed files with 578 additions and 1,023 deletions.
5 changes: 5 additions & 0 deletions charts/fleet-crd/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6490,6 +6490,11 @@ spec:
description: GitJobStatus is the status of the last Git job run,
e.g. "Current" if there was no error.
type: string
lastPollingTriggered:
description: LastPollingTime is the last time the polling check
was triggered
format: date-time
type: string
lastSyncedImageScanTime:
description: LastSyncedImageScanTime is the time of the last image
scan.
Expand Down
9 changes: 5 additions & 4 deletions e2e/single-cluster/gitrepo_polling_disabled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ var _ = Describe("GitRepoPollingDisabled", Label("infra-setup"), func() {
})

JustBeforeEach(func() {
err := testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo-polling-disabled.yaml"), struct {
var err error
clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "disable_polling")
Expect(err).ToNot(HaveOccurred())

err = testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo-polling-disabled.yaml"), struct {
Name string
Repo string
Branch string
Expand All @@ -97,9 +101,6 @@ var _ = Describe("GitRepoPollingDisabled", Label("infra-setup"), func() {
targetNamespace,
})
Expect(err).ToNot(HaveOccurred())

clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "disable_polling")
Expect(err).ToNot(HaveOccurred())
})

It("deploys the resources initially and updates them while force updating", func() {
Expand Down
8 changes: 4 additions & 4 deletions e2e/single-cluster/gitrepo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup"
return err
}).ShouldNot(HaveOccurred(), out)

// Clone previously created repo
clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "examples")
Expect(err).ToNot(HaveOccurred())

err = testenv.ApplyTemplate(k, testenv.AssetPath("gitrepo/gitrepo.yaml"), struct {
Name string
Repo string
Expand All @@ -277,10 +281,6 @@ var _ = Describe("Monitoring Git repos via HTTP for change", Label("infra-setup"
targetNamespace, // to avoid conflicts with other tests
})
Expect(err).ToNot(HaveOccurred())

// Clone previously created repo
clone, err = gh.Create(clonedir, testenv.AssetPath("gitrepo/sleeper-chart"), "examples")
Expect(err).ToNot(HaveOccurred())
})

It("updates the deployment", func() {
Expand Down
76 changes: 39 additions & 37 deletions integrationtests/gitjob/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/rancher/fleet/integrationtests/utils"
v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"
"github.com/rancher/wrangler/v3/pkg/genericcondition"
"github.com/rancher/wrangler/v3/pkg/name"

batchv1 "k8s.io/api/batch/v1"
Expand All @@ -32,6 +33,23 @@ const (
stableCommit = "26bdd9326b0238bb2fb743f863d9380c3c5d43e0"
)

func getCondition(gitrepo *v1alpha1.GitRepo, condType string) (genericcondition.GenericCondition, bool) {
for _, cond := range gitrepo.Status.Conditions {
if cond.Type == condType {
return cond, true
}
}
return genericcondition.GenericCondition{}, false
}

func checkCondition(gitrepo *v1alpha1.GitRepo, condType string, status corev1.ConditionStatus) bool {
cond, found := getCondition(gitrepo, condType)
if !found {
return false
}
return cond.Type == condType && cond.Status == status
}

var _ = Describe("GitJob controller", func() {

When("a new GitRepo is created", func() {
Expand All @@ -43,6 +61,7 @@ var _ = Describe("GitJob controller", func() {
)

JustBeforeEach(func() {
expectedCommit = commit
gitRepo = createGitRepo(gitRepoName)
Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred())
Eventually(func() string {
Expand All @@ -54,7 +73,6 @@ var _ = Describe("GitJob controller", func() {
}
return gitRepoFromCluster.Status.Display.ReadyBundleDeployments
}).Should(ContainSubstring("0/0"))
Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred())

By("Creating a job")
Eventually(func() error {
Expand Down Expand Up @@ -146,8 +164,8 @@ var _ = Describe("GitJob controller", func() {

Eventually(func() bool {
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).ToNot(HaveOccurred())
// XXX: do we need an additional `LastExecutedCommit` or similar status field?
return gitRepo.Status.GitJobStatus == "Failed"
// Job Status should be failed and commit should be as expected
return gitRepo.Status.GitJobStatus == "Failed" && gitRepo.Status.Commit == commit
}).Should(BeTrue())

By("verifying that the job is deleted if Spec.Generation changed")
Expand All @@ -168,9 +186,9 @@ var _ = Describe("GitJob controller", func() {
)

JustBeforeEach(func() {
expectedCommit = commit
gitRepo = createGitRepo(gitRepoName)
Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred())
Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred())

By("creating a Job")
Eventually(func() error {
Expand All @@ -185,7 +203,7 @@ var _ = Describe("GitJob controller", func() {
})
It("creates a new Job", func() {
const newCommit = "9ca3a0adbbba32"
Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, newCommit)).ToNot(HaveOccurred())
expectedCommit = newCommit
Eventually(func() error {
jobName := name.SafeConcatName(gitRepoName, name.Hex(repo+newCommit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
Expand All @@ -206,7 +224,6 @@ var _ = Describe("GitJob controller", func() {
JustBeforeEach(func() {
gitRepo = createGitRepo(gitRepoName)
Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred())
Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred())
Eventually(func() error {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
Expand All @@ -216,6 +233,7 @@ var _ = Describe("GitJob controller", func() {
Expect(simulateIncreaseForceSyncGeneration(gitRepo)).ToNot(HaveOccurred())
})
BeforeEach(func() {
expectedCommit = commit
gitRepoName = "force-deletion"
})
AfterEach(func() {
Expand Down Expand Up @@ -254,7 +272,6 @@ var _ = Describe("GitJob controller", func() {
JustBeforeEach(func() {
gitRepo = createGitRepo(gitRepoName)
Expect(k8sClient.Create(ctx, &gitRepo)).ToNot(HaveOccurred())
Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred())
Eventually(func() error {
jobName = name.SafeConcatName(gitRepoName, name.Hex(repo+commit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
Expand All @@ -274,6 +291,7 @@ var _ = Describe("GitJob controller", func() {
})

BeforeEach(func() {
expectedCommit = commit
gitRepoName = "simulate-arg-update"
})

Expand Down Expand Up @@ -307,6 +325,7 @@ var _ = Describe("GitJob controller", func() {

When("a job completes successfully", func() {
BeforeEach(func() {
expectedCommit = stableCommit
gitRepoName = "disable-polling"
})

Expand Down Expand Up @@ -338,14 +357,14 @@ var _ = Describe("GitJob controller", func() {
)

JustBeforeEach(func() {
expectedCommit = commit
gitRepoName = "test-no-for-paths-secret"
gitRepo = createGitRepo(gitRepoName)
gitRepo.Spec.HelmSecretNameForPaths = helmSecretNameForPaths
gitRepo.Spec.HelmSecretName = helmSecretName
// Create should return an error
err := k8sClient.Create(ctx, &gitRepo)
Expect(err).ToNot(HaveOccurred())
Expect(simulateGitPollerUpdatingCommitInStatus(gitRepo, commit)).ToNot(HaveOccurred())
})

AfterEach(func() {
Expand Down Expand Up @@ -473,28 +492,25 @@ var _ = Describe("GitRepo", func() {

When("creating a gitrepo", func() {
JustBeforeEach(func() {
expectedCommit = commit
err := k8sClient.Create(ctx, gitrepo)
Expect(err).NotTo(HaveOccurred())
})

It("updates the gitrepo status", func() {
org := gitrepo.ResourceVersion
Eventually(func() bool {
Eventually(func(g Gomega) {
_ = k8sClient.Get(ctx, types.NamespacedName{Name: gitrepoName, Namespace: namespace}, gitrepo)
return gitrepo.ResourceVersion > org &&
gitrepo.Status.Display.ReadyBundleDeployments == "0/0" &&
!gitrepo.Status.Display.Error &&
len(gitrepo.Status.Conditions) == 4 &&
gitrepo.Status.Conditions[0].Type == "Reconciling" &&
string(gitrepo.Status.Conditions[0].Status) == "False" &&
gitrepo.Status.Conditions[1].Type == "Stalled" &&
string(gitrepo.Status.Conditions[1].Status) == "False" &&
gitrepo.Status.Conditions[2].Type == "Ready" &&
string(gitrepo.Status.Conditions[2].Status) == "True" &&
gitrepo.Status.Conditions[3].Type == "Accepted" &&
string(gitrepo.Status.Conditions[3].Status) == "True" &&
gitrepo.Status.DeepCopy().ObservedGeneration == int64(1)
}).Should(BeTrue())
g.Expect(gitrepo.ResourceVersion > org).To(BeTrue())
g.Expect(gitrepo.Status.Display.ReadyBundleDeployments).To(Equal("0/0"))
g.Expect(gitrepo.Status.Display.Error).To(BeFalse())
g.Expect(len(gitrepo.Status.Conditions)).To(Equal(5))
g.Expect(checkCondition(gitrepo, "GitPolling", corev1.ConditionTrue)).To(BeTrue())
g.Expect(checkCondition(gitrepo, "Reconciling", corev1.ConditionTrue)).To(BeTrue())
g.Expect(checkCondition(gitrepo, "Stalled", corev1.ConditionFalse)).To(BeTrue())
g.Expect(checkCondition(gitrepo, "Ready", corev1.ConditionTrue)).To(BeTrue())
g.Expect(checkCondition(gitrepo, "Accepted", corev1.ConditionTrue)).To(BeTrue())
}).Should(Succeed())
})
})
})
Expand Down Expand Up @@ -619,20 +635,6 @@ func simulateIncreaseGitRepoGeneration(gitRepo v1alpha1.GitRepo) error {
})
}

func simulateGitPollerUpdatingCommitInStatus(gitRepo v1alpha1.GitRepo, commit string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
var gitRepoFromCluster v1alpha1.GitRepo
err := k8sClient.Get(ctx, types.NamespacedName{Name: gitRepo.Name, Namespace: gitRepo.Namespace}, &gitRepoFromCluster)
if err != nil {
return err
}
gitRepoFromCluster.Status = v1alpha1.GitRepoStatus{
Commit: commit,
}
return k8sClient.Status().Update(ctx, &gitRepoFromCluster)
})
}

func createGitRepo(gitRepoName string) v1alpha1.GitRepo {
return v1alpha1.GitRepo{
ObjectMeta: metav1.ObjectMeta{
Expand Down
39 changes: 22 additions & 17 deletions integrationtests/gitjob/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
ctrlreconciler "github.com/rancher/fleet/internal/cmd/controller/reconciler"
"github.com/rancher/fleet/internal/cmd/controller/target"
"github.com/rancher/fleet/internal/manifest"
"github.com/rancher/fleet/internal/mocks"
v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"
"github.com/rancher/fleet/pkg/git/mocks"

"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -33,13 +33,14 @@ const (
)

var (
cfg *rest.Config
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
k8sClient client.Client
logsBuffer bytes.Buffer
namespace string
cfg *rest.Config
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
k8sClient client.Client
logsBuffer bytes.Buffer
namespace string
expectedCommit string
)

func TestGitJobController(t *testing.T) {
Expand Down Expand Up @@ -73,25 +74,29 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())

ctlr := gomock.NewController(GinkgoT())
gitPollerMock := mocks.NewMockGitPoller(ctlr)

// redirect logs to a buffer that we can read in the tests
GinkgoWriter.TeeTo(&logsBuffer)
ctrl.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

// do nothing if gitPoller is called. gitPoller calls are tested in unit tests
gitPollerMock.EXPECT().AddOrModifyGitRepoPollJob(gomock.Any(), gomock.Any()).AnyTimes()
gitPollerMock.EXPECT().CleanUpGitRepoPollJobs(gomock.Any()).AnyTimes()
// return whatever commit the test is expecting
fetcherMock := mocks.NewMockGitFetcher(ctlr)
fetcherMock.EXPECT().LatestCommit(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, gitrepo *v1alpha1.GitRepo, client client.Client) (string, error) {
return expectedCommit, nil
},
)

sched := quartz.NewStdScheduler()
Expect(sched).ToNot(BeNil())

err = (&reconciler.GitJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Image: "image",
GitPoller: gitPollerMock,
Scheduler: sched,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Image: "image",
Scheduler: sched,
GitFetcher: fetcherMock,
Clock: reconciler.RealClock{},
}).SetupWithManager(mgr)
Expect(err).ToNot(HaveOccurred())

Expand Down
17 changes: 9 additions & 8 deletions internal/cmd/controller/gitops/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/rancher/fleet/internal/cmd/controller/gitops/reconciler"
"github.com/rancher/fleet/internal/metrics"
fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"
"github.com/rancher/fleet/pkg/git/poll"
"github.com/rancher/fleet/pkg/git"
"github.com/rancher/fleet/pkg/version"
"github.com/rancher/fleet/pkg/webhook"
"github.com/reugn/go-quartz/quartz"
Expand Down Expand Up @@ -122,13 +122,14 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error {
}

reconciler := &reconciler.GitJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Image: g.Image,
GitPoller: poll.NewHandler(ctx, mgr.GetClient()),
Scheduler: sched,
Workers: workers,
ShardID: g.ShardID,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Image: g.Image,
Scheduler: sched,
Workers: workers,
ShardID: g.ShardID,
GitFetcher: &git.Fetch{},
Clock: reconciler.RealClock{},
}

group, ctx := errgroup.WithContext(ctx)
Expand Down
Loading

0 comments on commit 98adea9

Please sign in to comment.