Skip to content

Commit

Permalink
add retries to clusterrepo handler
Browse files Browse the repository at this point in the history
  • Loading branch information
diogoasouza committed Sep 6, 2024
1 parent cb04617 commit 67d75f0
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 54 deletions.
247 changes: 206 additions & 41 deletions pkg/controllers/dashboard/helm/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/api/equality"
"time"

catalog "github.com/rancher/rancher/pkg/apis/catalog.cattle.io/v1"
Expand Down Expand Up @@ -33,6 +34,12 @@ const (
defaultInterval = 6 * time.Hour
)

var defaultRetryPolicy = retryPolicy{
MinWait: 5 * time.Minute,
MaxWait: 20 * time.Minute,
MaxRetry: 3,
}

type repoHandler struct {
secrets corev1controllers.SecretCache
clusterRepos catalogcontrollers.ClusterRepoController
Expand All @@ -55,8 +62,7 @@ func RegisterRepos(ctx context.Context,
apply: apply.WithCacheTypes(configMap).WithStrictCaching().WithSetOwnerReference(false, false),
}

catalogcontrollers.RegisterClusterRepoStatusHandler(ctx, clusterRepos,
condition.Cond(catalog.RepoDownloaded), "helm-clusterrepo-download", h.ClusterRepoDownloadStatusHandler)
clusterRepos.OnChange(ctx, "helm-clusterrepo-download-on-change", h.ClusterRepoDownloadStatusHandler2)

}

Expand Down Expand Up @@ -87,32 +93,45 @@ func (r *repoHandler) ClusterRepoDownloadEnsureStatusHandler(repo *catalog.Clust
return r.ensure(&repo.Spec, status, &repo.ObjectMeta)
}

func (r *repoHandler) ClusterRepoDownloadStatusHandler(repo *catalog.ClusterRepo, status catalog.RepoStatus) (catalog.RepoStatus, error) {
func (r *repoHandler) ClusterRepoDownloadStatusHandler2(key string, repo *catalog.ClusterRepo) (*catalog.ClusterRepo, error) {
// Ignore OCI Based Helm Repositories
if registry.IsOCI(repo.Spec.URL) {
return status, nil
return repo, nil
}

interval := defaultInterval
if repo.Spec.RefreshInterval > 0 {
interval = time.Duration(repo.Spec.RefreshInterval) * time.Second
}

err := ensureIndexConfigMap(repo, &status, r.configMaps)
newStatus := repo.Status.DeepCopy()
retryPolicy, err := getRetryPolicy(repo)
if err != nil {
return status, err
err = fmt.Errorf("failed to get retry policy: %w", err)
return r.setErrorCondition(repo, err, newStatus, interval)
}
if r.shouldSkip(repo, retryPolicy, repo.Name, interval) {
return repo, nil
}
newStatus.ShouldNotSkip = false

err = ensureIndexConfigMap(repo, &repo.Status, r.configMaps)
if err != nil {
err = fmt.Errorf("failed to ensure index config map: %w", err)
return r.setErrorCondition(repo, err, newStatus, interval)
}
if !shouldRefresh(&repo.Spec, &status, interval) {

if !shouldRefresh(&repo.Spec, &repo.Status, interval) { //reset retries too
r.clusterRepos.EnqueueAfter(repo.Name, interval)
return status, nil
return repo, nil
}

return r.download(&repo.Spec, status, &repo.ObjectMeta, metav1.OwnerReference{
return r.download(repo, *newStatus, metav1.OwnerReference{
APIVersion: catalog.SchemeGroupVersion.Group + "/" + catalog.SchemeGroupVersion.Version,
Kind: "ClusterRepo",
Name: repo.Name,
UID: repo.UID,
})
}, interval, retryPolicy)
}

func toOwnerObject(namespace string, owner metav1.OwnerReference) runtime.Object {
Expand Down Expand Up @@ -211,66 +230,95 @@ func (r *repoHandler) ensure(repoSpec *catalog.RepoSpec, status catalog.RepoStat
return status, git.Ensure(secret, metadata.Namespace, metadata.Name, status.URL, status.Commit, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle)
}

func (r *repoHandler) download(repoSpec *catalog.RepoSpec, status catalog.RepoStatus, metadata *metav1.ObjectMeta, owner metav1.OwnerReference) (catalog.RepoStatus, error) {
func (r *repoHandler) download(repository *catalog.ClusterRepo, newStatus catalog.RepoStatus, owner metav1.OwnerReference, interval time.Duration, retryPolicy retryPolicy) (*catalog.ClusterRepo, error) {
var (
index *repo.IndexFile
commit string
err error
)

status.ObservedGeneration = metadata.Generation
metadata := repository.ObjectMeta
repoSpec := repository.Spec
newStatus.ObservedGeneration = metadata.Generation

secret, err := catalogv2.GetSecret(r.secrets, repoSpec, metadata.Namespace)
secret, err := catalogv2.GetSecret(r.secrets, &repoSpec, metadata.Namespace)
if err != nil {
return status, err
return r.setErrorCondition(repository, err, &newStatus, interval)
}

downloadTime := metav1.Now()
if repoSpec.GitRepo != "" && status.IndexConfigMapName == "" {
backoff := calculateBackoff(repository, retryPolicy)
retriable := false
// git repo and no configmap
if repoSpec.GitRepo != "" && newStatus.IndexConfigMapName == "" {
//retry here
commit, err = git.Head(secret, metadata.Namespace, metadata.Name, repoSpec.GitRepo, repoSpec.GitBranch, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle)
if err != nil {
return status, err
retriable = true
} else {
newStatus.URL = repoSpec.GitRepo
newStatus.Branch = repoSpec.GitBranch
//no retry here, no external calls
index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo)
}
status.URL = repoSpec.GitRepo
status.Branch = repoSpec.GitBranch
index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo)
//git repo
} else if repoSpec.GitRepo != "" {
//retry here
commit, err = git.Update(secret, metadata.Namespace, metadata.Name, repoSpec.GitRepo, repoSpec.GitBranch, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle)
if err != nil {
return status, err
}
status.URL = repoSpec.GitRepo
status.Branch = repoSpec.GitBranch
if status.Commit == commit {
status.DownloadTime = downloadTime
return status, nil
retriable = true
//return status, err
} else {
newStatus.URL = repoSpec.GitRepo
newStatus.Branch = repoSpec.GitBranch
if newStatus.Commit == commit {
newStatus.DownloadTime = downloadTime
return repository, nil
}
//no retry here, no external calls
index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo)
}
index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo)
// http repo
} else if repoSpec.URL != "" {
//retry here
index, err = helmhttp.DownloadIndex(secret, repoSpec.URL, repoSpec.CABundle, repoSpec.InsecureSkipTLSverify, repoSpec.DisableSameOriginCheck)

status.URL = repoSpec.URL
status.Branch = ""
retriable = true
newStatus.URL = repoSpec.URL
newStatus.Branch = ""
// something weird
} else {
return status, nil
return repository, nil
}
if err != nil || index == nil {
return status, err
if retriable && err != nil {
newStatus.NumberOfRetries++
if newStatus.NumberOfRetries > retryPolicy.MaxRetry {
newStatus.NumberOfRetries = 0
newStatus.NextRetryAt = metav1.Time{}
return r.setConditionWithInterval(repository, err, &newStatus, nil, interval)
}
newStatus.NextRetryAt = metav1.Time{Time: timeNow().UTC().Add(backoff)}
return r.setConditionWithInterval(repository, err, &newStatus, &backoff, interval)
}
if err != nil {
return repository, err
}
if index == nil {
return repository, nil
}

index.SortEntries()

cm, err := createOrUpdateMap(metadata.Namespace, index, owner, r.apply)
if err != nil {
return status, err
return repository, err
}

status.IndexConfigMapName = cm.Name
status.IndexConfigMapNamespace = cm.Namespace
status.IndexConfigMapResourceVersion = cm.ResourceVersion
status.DownloadTime = downloadTime
status.Commit = commit
return status, nil
newStatus.IndexConfigMapName = cm.Name
newStatus.IndexConfigMapNamespace = cm.Namespace
newStatus.IndexConfigMapResourceVersion = cm.ResourceVersion
newStatus.DownloadTime = downloadTime
newStatus.Commit = commit
repository.Status = newStatus
return r.clusterRepos.UpdateStatus(repository)
}

func ensureIndexConfigMap(repo *catalog.ClusterRepo, status *catalog.RepoStatus, configMap corev1controllers.ConfigMapClient) error {
Expand Down Expand Up @@ -298,22 +346,28 @@ func ensureIndexConfigMap(repo *catalog.ClusterRepo, status *catalog.RepoStatus,
}

func shouldRefresh(spec *catalog.RepoSpec, status *catalog.RepoStatus, interval time.Duration) bool {
// check if branch changed
if spec.GitRepo != "" && status.Branch != spec.GitBranch {
return true
}
// check if url changed
if spec.URL != "" && spec.URL != status.URL {
return true
}
// check if git repo changed
if spec.GitRepo != "" && spec.GitRepo != status.URL {
return true
}
// check if there's no index config map
if status.IndexConfigMapName == "" {
return true
}
// check if it's a force update
if spec.ForceUpdate != nil && spec.ForceUpdate.After(status.DownloadTime.Time) && spec.ForceUpdate.Time.Before(time.Now()) {
return true
}
refreshTime := time.Now().Add(-interval)
// check if interval has passed
return refreshTime.After(status.DownloadTime.Time)
}

Expand All @@ -328,3 +382,114 @@ func GetConfigMapNamespace(namespace string) string {

return namespace
}

// setErrorCondition is only called when error happens in the handler, and
// we need to depend on wrangler to requeue the handler
func (r *repoHandler) setErrorCondition(clusterRepo *catalog.ClusterRepo, err error, newStatus *catalog.RepoStatus, interval time.Duration) (*catalog.ClusterRepo, error) {
var statusErr error
newStatus.NumberOfRetries = 0
newStatus.NextRetryAt = metav1.Time{}
if err != nil {
newStatus.ShouldNotSkip = true
}
downloaded := condition.Cond(catalog.RepoDownloaded)
if apierrors.IsConflict(err) {
downloaded.SetError(newStatus, "", nil)
} else {
downloaded.SetError(newStatus, "", err)
}
newStatus.ObservedGeneration = clusterRepo.Generation

if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) {
downloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339))

clusterRepo.Status = *newStatus
//status handler will run again without waiting and enqueue after won't work
clusterRepo, statusErr = r.clusterRepos.UpdateStatus(clusterRepo)
if statusErr != nil {
err = statusErr
}
if err == nil {
r.clusterRepos.EnqueueAfter(clusterRepo.Name, interval)
}
return clusterRepo, err
}

if err == nil {
r.clusterRepos.EnqueueAfter(clusterRepo.Name, interval)
}
return clusterRepo, err
}

// shouldSkip checks certain conditions to see if the handler should be skipped.
// For information regarding the conditions, check the comments in the implementation.
func (r *repoHandler) shouldSkip(clusterRepo *catalog.ClusterRepo, policy retryPolicy, key string, ociInterval time.Duration) bool {
// this is to prevent the handler from making calls when the crd is outdated.
updatedRepo, err := r.clusterRepos.Get(key, metav1.GetOptions{})
if err == nil && updatedRepo.ResourceVersion != clusterRepo.ResourceVersion {
return true
}

if clusterRepo.Status.ObservedGeneration < clusterRepo.Generation {
clusterRepo.Status.NumberOfRetries = 0
clusterRepo.Status.NextRetryAt = metav1.Time{}
return false
}

// The handler is triggered immediately after any changes, including when updating the number of retries done.
// This check is to prevent the handler from executing before the backoff time has passed
if !clusterRepo.Status.NextRetryAt.IsZero() && clusterRepo.Status.NextRetryAt.Time.After(timeNow().UTC()) {
return true
}

if clusterRepo.Status.ShouldNotSkip { //checks if we should skip running the handler or not
return false
}

downloaded := condition.Cond(catalog.RepoDownloaded)
downloadedUpdateTime, _ := time.Parse(time.RFC3339, downloaded.GetLastUpdated(clusterRepo))

if (clusterRepo.Status.NumberOfRetries > policy.MaxRetry || clusterRepo.Status.NumberOfRetries == 0) && // checks if it's not retrying
clusterRepo.Generation == clusterRepo.Status.ObservedGeneration && // checks if the generation has not changed
downloadedUpdateTime.Add(ociInterval).After(timeNow().UTC()) { // checks if the interval has not passed

r.clusterRepos.EnqueueAfter(clusterRepo.Name, ociInterval)
return true
}
return false
}

// setConditionWithInterval is called to reenqueue the object
// after the interval of 6 hours.
func (r *repoHandler) setConditionWithInterval(clusterRepo *catalog.ClusterRepo, err error, newStatus *catalog.RepoStatus, backoff *time.Duration, interval time.Duration) (*catalog.ClusterRepo, error) {
var newErr error
if backoff != nil {
newErr = fmt.Errorf("%s. %s", err.Error(), fmt.Sprintf("Will retry after %s", backoff.Round(time.Second)))
} else {
newErr = fmt.Errorf("%s. %s", err.Error(), fmt.Sprintf("Will retry after %s", interval.Round(time.Second)))
}

downloaded := condition.Cond(catalog.RepoDownloaded)
if apierrors.IsConflict(err) {
downloaded.SetError(newStatus, "", nil)
} else {
downloaded.SetError(newStatus, "", newErr)
}
newStatus.ObservedGeneration = clusterRepo.Generation
if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) {
//Since status has changed, update the lastUpdatedTime
downloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339))
clusterRepo.Status = *newStatus
_, statusErr := r.clusterRepos.UpdateStatus(clusterRepo)
if statusErr != nil {
return clusterRepo, statusErr
}
}

if backoff != nil {
r.clusterRepos.EnqueueAfter(clusterRepo.Name, *backoff)
} else {
r.clusterRepos.EnqueueAfter(clusterRepo.Name, interval)
}
return clusterRepo, nil
}
Loading

0 comments on commit 67d75f0

Please sign in to comment.