diff --git a/pkg/controllers/dashboard/helm/repo.go b/pkg/controllers/dashboard/helm/repo.go index e792df9fbe5..d04c68e1523 100644 --- a/pkg/controllers/dashboard/helm/repo.go +++ b/pkg/controllers/dashboard/helm/repo.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/api/equality" "time" catalog "github.com/rancher/rancher/pkg/apis/catalog.cattle.io/v1" @@ -33,6 +34,12 @@ const ( defaultInterval = 6 * time.Hour ) +var defaultRetryPolicy = retryPolicy{ + MinWait: 5 * time.Minute, + MaxWait: 20 * time.Minute, + MaxRetry: 3, +} + type repoHandler struct { secrets corev1controllers.SecretCache clusterRepos catalogcontrollers.ClusterRepoController @@ -55,8 +62,7 @@ func RegisterRepos(ctx context.Context, apply: apply.WithCacheTypes(configMap).WithStrictCaching().WithSetOwnerReference(false, false), } - catalogcontrollers.RegisterClusterRepoStatusHandler(ctx, clusterRepos, - condition.Cond(catalog.RepoDownloaded), "helm-clusterrepo-download", h.ClusterRepoDownloadStatusHandler) + clusterRepos.OnChange(ctx, "helm-clusterrepo-download-on-change", h.ClusterRepoDownloadStatusHandler2) } @@ -87,10 +93,10 @@ func (r *repoHandler) ClusterRepoDownloadEnsureStatusHandler(repo *catalog.Clust return r.ensure(&repo.Spec, status, &repo.ObjectMeta) } -func (r *repoHandler) ClusterRepoDownloadStatusHandler(repo *catalog.ClusterRepo, status catalog.RepoStatus) (catalog.RepoStatus, error) { +func (r *repoHandler) ClusterRepoDownloadStatusHandler2(key string, repo *catalog.ClusterRepo) (*catalog.ClusterRepo, error) { // Ignore OCI Based Helm Repositories if registry.IsOCI(repo.Spec.URL) { - return status, nil + return repo, nil } interval := defaultInterval @@ -98,21 +104,34 @@ func (r *repoHandler) ClusterRepoDownloadStatusHandler(repo *catalog.ClusterRepo interval = time.Duration(repo.Spec.RefreshInterval) * time.Second } - err := ensureIndexConfigMap(repo, &status, r.configMaps) + newStatus := repo.Status.DeepCopy() + retryPolicy, err := getRetryPolicy(repo) if err != nil { - return status, err + err = fmt.Errorf("failed to get retry policy: %w", err) + return r.setErrorCondition(repo, err, newStatus, interval) + } + if r.shouldSkip(repo, retryPolicy, repo.Name, interval) { + return repo, nil + } + newStatus.ShouldNotSkip = false + + err = ensureIndexConfigMap(repo, &repo.Status, r.configMaps) + if err != nil { + err = fmt.Errorf("failed to ensure index config map: %w", err) + return r.setErrorCondition(repo, err, newStatus, interval) } - if !shouldRefresh(&repo.Spec, &status, interval) { + + if !shouldRefresh(&repo.Spec, &repo.Status, interval) { //reset retries too r.clusterRepos.EnqueueAfter(repo.Name, interval) - return status, nil + return repo, nil } - return r.download(&repo.Spec, status, &repo.ObjectMeta, metav1.OwnerReference{ + return r.download(repo, *newStatus, metav1.OwnerReference{ APIVersion: catalog.SchemeGroupVersion.Group + "/" + catalog.SchemeGroupVersion.Version, Kind: "ClusterRepo", Name: repo.Name, UID: repo.UID, - }) + }, interval, retryPolicy) } func toOwnerObject(namespace string, owner metav1.OwnerReference) runtime.Object { @@ -211,66 +230,95 @@ func (r *repoHandler) ensure(repoSpec *catalog.RepoSpec, status catalog.RepoStat return status, git.Ensure(secret, metadata.Namespace, metadata.Name, status.URL, status.Commit, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle) } -func (r *repoHandler) download(repoSpec *catalog.RepoSpec, status catalog.RepoStatus, metadata *metav1.ObjectMeta, owner metav1.OwnerReference) (catalog.RepoStatus, error) { +func (r *repoHandler) download(repository *catalog.ClusterRepo, newStatus catalog.RepoStatus, owner metav1.OwnerReference, interval time.Duration, retryPolicy retryPolicy) (*catalog.ClusterRepo, error) { var ( index *repo.IndexFile commit string err error ) - status.ObservedGeneration = metadata.Generation + metadata := repository.ObjectMeta + repoSpec := repository.Spec + newStatus.ObservedGeneration = metadata.Generation - secret, err := catalogv2.GetSecret(r.secrets, repoSpec, metadata.Namespace) + secret, err := catalogv2.GetSecret(r.secrets, &repoSpec, metadata.Namespace) if err != nil { - return status, err + return r.setErrorCondition(repository, err, &newStatus, interval) } downloadTime := metav1.Now() - if repoSpec.GitRepo != "" && status.IndexConfigMapName == "" { + backoff := calculateBackoff(repository, retryPolicy) + retriable := false + // git repo and no configmap + if repoSpec.GitRepo != "" && newStatus.IndexConfigMapName == "" { + //retry here commit, err = git.Head(secret, metadata.Namespace, metadata.Name, repoSpec.GitRepo, repoSpec.GitBranch, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle) if err != nil { - return status, err + retriable = true + } else { + newStatus.URL = repoSpec.GitRepo + newStatus.Branch = repoSpec.GitBranch + //no retry here, no external calls + index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo) } - status.URL = repoSpec.GitRepo - status.Branch = repoSpec.GitBranch - index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo) + //git repo } else if repoSpec.GitRepo != "" { + //retry here commit, err = git.Update(secret, metadata.Namespace, metadata.Name, repoSpec.GitRepo, repoSpec.GitBranch, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle) if err != nil { - return status, err - } - status.URL = repoSpec.GitRepo - status.Branch = repoSpec.GitBranch - if status.Commit == commit { - status.DownloadTime = downloadTime - return status, nil + retriable = true + //return status, err + } else { + newStatus.URL = repoSpec.GitRepo + newStatus.Branch = repoSpec.GitBranch + if newStatus.Commit == commit { + newStatus.DownloadTime = downloadTime + return repository, nil + } + //no retry here, no external calls + index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo) } - index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo) + // http repo } else if repoSpec.URL != "" { + //retry here index, err = helmhttp.DownloadIndex(secret, repoSpec.URL, repoSpec.CABundle, repoSpec.InsecureSkipTLSverify, repoSpec.DisableSameOriginCheck) - - status.URL = repoSpec.URL - status.Branch = "" + retriable = true + newStatus.URL = repoSpec.URL + newStatus.Branch = "" + // something weird } else { - return status, nil + return repository, nil } - if err != nil || index == nil { - return status, err + if retriable && err != nil { + newStatus.NumberOfRetries++ + if newStatus.NumberOfRetries > retryPolicy.MaxRetry { + newStatus.NumberOfRetries = 0 + newStatus.NextRetryAt = metav1.Time{} + return r.setConditionWithInterval(repository, err, &newStatus, nil, interval) + } + newStatus.NextRetryAt = metav1.Time{Time: timeNow().UTC().Add(backoff)} + return r.setConditionWithInterval(repository, err, &newStatus, &backoff, interval) + } + if err != nil { + return repository, err + } + if index == nil { + return repository, nil } index.SortEntries() - cm, err := createOrUpdateMap(metadata.Namespace, index, owner, r.apply) if err != nil { - return status, err + return repository, err } - status.IndexConfigMapName = cm.Name - status.IndexConfigMapNamespace = cm.Namespace - status.IndexConfigMapResourceVersion = cm.ResourceVersion - status.DownloadTime = downloadTime - status.Commit = commit - return status, nil + newStatus.IndexConfigMapName = cm.Name + newStatus.IndexConfigMapNamespace = cm.Namespace + newStatus.IndexConfigMapResourceVersion = cm.ResourceVersion + newStatus.DownloadTime = downloadTime + newStatus.Commit = commit + repository.Status = newStatus + return r.clusterRepos.UpdateStatus(repository) } func ensureIndexConfigMap(repo *catalog.ClusterRepo, status *catalog.RepoStatus, configMap corev1controllers.ConfigMapClient) error { @@ -298,22 +346,28 @@ func ensureIndexConfigMap(repo *catalog.ClusterRepo, status *catalog.RepoStatus, } func shouldRefresh(spec *catalog.RepoSpec, status *catalog.RepoStatus, interval time.Duration) bool { + // check if branch changed if spec.GitRepo != "" && status.Branch != spec.GitBranch { return true } + // check if url changed if spec.URL != "" && spec.URL != status.URL { return true } + // check if git repo changed if spec.GitRepo != "" && spec.GitRepo != status.URL { return true } + // check if there's no index config map if status.IndexConfigMapName == "" { return true } + // check if it's a force update if spec.ForceUpdate != nil && spec.ForceUpdate.After(status.DownloadTime.Time) && spec.ForceUpdate.Time.Before(time.Now()) { return true } refreshTime := time.Now().Add(-interval) + // check if interval has passed return refreshTime.After(status.DownloadTime.Time) } @@ -328,3 +382,114 @@ func GetConfigMapNamespace(namespace string) string { return namespace } + +// setErrorCondition is only called when error happens in the handler, and +// we need to depend on wrangler to requeue the handler +func (r *repoHandler) setErrorCondition(clusterRepo *catalog.ClusterRepo, err error, newStatus *catalog.RepoStatus, interval time.Duration) (*catalog.ClusterRepo, error) { + var statusErr error + newStatus.NumberOfRetries = 0 + newStatus.NextRetryAt = metav1.Time{} + if err != nil { + newStatus.ShouldNotSkip = true + } + downloaded := condition.Cond(catalog.RepoDownloaded) + if apierrors.IsConflict(err) { + downloaded.SetError(newStatus, "", nil) + } else { + downloaded.SetError(newStatus, "", err) + } + newStatus.ObservedGeneration = clusterRepo.Generation + + if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) { + downloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339)) + + clusterRepo.Status = *newStatus + //status handler will run again without waiting and enqueue after won't work + clusterRepo, statusErr = r.clusterRepos.UpdateStatus(clusterRepo) + if statusErr != nil { + err = statusErr + } + if err == nil { + r.clusterRepos.EnqueueAfter(clusterRepo.Name, interval) + } + return clusterRepo, err + } + + if err == nil { + r.clusterRepos.EnqueueAfter(clusterRepo.Name, interval) + } + return clusterRepo, err +} + +// shouldSkip checks certain conditions to see if the handler should be skipped. +// For information regarding the conditions, check the comments in the implementation. +func (r *repoHandler) shouldSkip(clusterRepo *catalog.ClusterRepo, policy retryPolicy, key string, ociInterval time.Duration) bool { + // this is to prevent the handler from making calls when the crd is outdated. + updatedRepo, err := r.clusterRepos.Get(key, metav1.GetOptions{}) + if err == nil && updatedRepo.ResourceVersion != clusterRepo.ResourceVersion { + return true + } + + if clusterRepo.Status.ObservedGeneration < clusterRepo.Generation { + clusterRepo.Status.NumberOfRetries = 0 + clusterRepo.Status.NextRetryAt = metav1.Time{} + return false + } + + // The handler is triggered immediately after any changes, including when updating the number of retries done. + // This check is to prevent the handler from executing before the backoff time has passed + if !clusterRepo.Status.NextRetryAt.IsZero() && clusterRepo.Status.NextRetryAt.Time.After(timeNow().UTC()) { + return true + } + + if clusterRepo.Status.ShouldNotSkip { //checks if we should skip running the handler or not + return false + } + + downloaded := condition.Cond(catalog.RepoDownloaded) + downloadedUpdateTime, _ := time.Parse(time.RFC3339, downloaded.GetLastUpdated(clusterRepo)) + + if (clusterRepo.Status.NumberOfRetries > policy.MaxRetry || clusterRepo.Status.NumberOfRetries == 0) && // checks if it's not retrying + clusterRepo.Generation == clusterRepo.Status.ObservedGeneration && // checks if the generation has not changed + downloadedUpdateTime.Add(ociInterval).After(timeNow().UTC()) { // checks if the interval has not passed + + r.clusterRepos.EnqueueAfter(clusterRepo.Name, ociInterval) + return true + } + return false +} + +// setConditionWithInterval is called to reenqueue the object +// after the interval of 6 hours. +func (r *repoHandler) setConditionWithInterval(clusterRepo *catalog.ClusterRepo, err error, newStatus *catalog.RepoStatus, backoff *time.Duration, interval time.Duration) (*catalog.ClusterRepo, error) { + var newErr error + if backoff != nil { + newErr = fmt.Errorf("%s. %s", err.Error(), fmt.Sprintf("Will retry after %s", backoff.Round(time.Second))) + } else { + newErr = fmt.Errorf("%s. %s", err.Error(), fmt.Sprintf("Will retry after %s", interval.Round(time.Second))) + } + + downloaded := condition.Cond(catalog.RepoDownloaded) + if apierrors.IsConflict(err) { + downloaded.SetError(newStatus, "", nil) + } else { + downloaded.SetError(newStatus, "", newErr) + } + newStatus.ObservedGeneration = clusterRepo.Generation + if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) { + //Since status has changed, update the lastUpdatedTime + downloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339)) + clusterRepo.Status = *newStatus + _, statusErr := r.clusterRepos.UpdateStatus(clusterRepo) + if statusErr != nil { + return clusterRepo, statusErr + } + } + + if backoff != nil { + r.clusterRepos.EnqueueAfter(clusterRepo.Name, *backoff) + } else { + r.clusterRepos.EnqueueAfter(clusterRepo.Name, interval) + } + return clusterRepo, nil +} diff --git a/pkg/controllers/dashboard/helm/repo_oci.go b/pkg/controllers/dashboard/helm/repo_oci.go index 7c9e15518f7..f448dc400aa 100644 --- a/pkg/controllers/dashboard/helm/repo_oci.go +++ b/pkg/controllers/dashboard/helm/repo_oci.go @@ -34,7 +34,12 @@ import ( ) var ( - timeNow = time.Now + timeNow = time.Now + defaultOCIRetryPolicy = retryPolicy{ + MinWait: 1 * time.Second, + MaxWait: 5 * time.Second, + MaxRetry: 5, + } ) const defaultOCIInterval = 24 * time.Hour @@ -399,37 +404,37 @@ func calculateBackoff(clusterRepo *catalog.ClusterRepo, policy retryPolicy) time func getRetryPolicy(clusterRepo *catalog.ClusterRepo) (retryPolicy, error) { // Default Values for exponentialBackOff function which is used // to retry an HTTP call when 429 response code is hit. - defaultRetryPolicy := retryPolicy{ - MinWait: 1 * time.Second, - MaxWait: 5 * time.Second, - MaxRetry: 5, + + policy := defaultOCIRetryPolicy + if !registry.IsOCI(clusterRepo.Spec.URL) { + policy = defaultRetryPolicy } if clusterRepo.Spec.ExponentialBackOffValues != nil { // Set MaxRetry if specified and valid if clusterRepo.Spec.ExponentialBackOffValues.MaxRetries > 0 { - defaultRetryPolicy.MaxRetry = clusterRepo.Spec.ExponentialBackOffValues.MaxRetries + policy.MaxRetry = clusterRepo.Spec.ExponentialBackOffValues.MaxRetries } // Set MinWait if specified and valid if clusterRepo.Spec.ExponentialBackOffValues.MinWait >= 1 { - defaultRetryPolicy.MinWait = time.Duration(clusterRepo.Spec.ExponentialBackOffValues.MinWait) * time.Second + policy.MinWait = time.Duration(clusterRepo.Spec.ExponentialBackOffValues.MinWait) * time.Second } else if clusterRepo.Spec.ExponentialBackOffValues.MinWait != 0 { - return defaultRetryPolicy, errors.New("minWait must be at least 1 second") + return policy, errors.New("minWait must be at least 1 second") } // Set MaxWait if specified and valid if clusterRepo.Spec.ExponentialBackOffValues.MaxWait > 0 { - defaultRetryPolicy.MaxWait = time.Duration(clusterRepo.Spec.ExponentialBackOffValues.MaxWait) * time.Second + policy.MaxWait = time.Duration(clusterRepo.Spec.ExponentialBackOffValues.MaxWait) * time.Second } } // Ensure MaxWait is not less than MinWait - if defaultRetryPolicy.MaxWait < defaultRetryPolicy.MinWait { - return defaultRetryPolicy, errors.New("maxWait must be greater than or equal to minWait") + if policy.MaxWait < policy.MinWait { + return policy, errors.New("maxWait must be greater than or equal to minWait") } - return defaultRetryPolicy, nil + return policy, nil } // shouldSkip checks certain conditions to see if the handler should be skipped. diff --git a/pkg/generated/controllers/catalog.cattle.io/v1/clusterrepo.go b/pkg/generated/controllers/catalog.cattle.io/v1/clusterrepo.go index a4179b3ff2b..139b9923006 100644 --- a/pkg/generated/controllers/catalog.cattle.io/v1/clusterrepo.go +++ b/pkg/generated/controllers/catalog.cattle.io/v1/clusterrepo.go @@ -106,7 +106,7 @@ func (a *clusterRepoStatusHandler) sync(key string, obj *v1.ClusterRepo) (*v1.Cl if a.condition != "" { if errors.IsConflict(err) { a.condition.SetError(&newStatus, "", nil) - } else { + } else { //this will reset any reason a.condition.SetError(&newStatus, "", err) } }