diff --git a/pkg/controllers/dashboard/helm/repo.go b/pkg/controllers/dashboard/helm/repo.go index e792df9fbe5..a32e126e113 100644 --- a/pkg/controllers/dashboard/helm/repo.go +++ b/pkg/controllers/dashboard/helm/repo.go @@ -8,6 +8,8 @@ import ( "fmt" "time" + "k8s.io/apimachinery/pkg/api/equality" + catalog "github.com/rancher/rancher/pkg/apis/catalog.cattle.io/v1" "github.com/rancher/rancher/pkg/catalogv2" "github.com/rancher/rancher/pkg/catalogv2/git" @@ -30,9 +32,16 @@ import ( const ( maxSize = 100_000 - defaultInterval = 6 * time.Hour + defaultInterval = 1 * time.Hour + repoCondition = catalog.RepoDownloaded ) +var defaultHandlerErrRetryPolicy = retryPolicy{ + MinWait: 5 * time.Minute, + MaxWait: 20 * time.Minute, + MaxRetry: 3, +} + type repoHandler struct { secrets corev1controllers.SecretCache clusterRepos catalogcontrollers.ClusterRepoController @@ -55,8 +64,7 @@ func RegisterRepos(ctx context.Context, apply: apply.WithCacheTypes(configMap).WithStrictCaching().WithSetOwnerReference(false, false), } - catalogcontrollers.RegisterClusterRepoStatusHandler(ctx, clusterRepos, - condition.Cond(catalog.RepoDownloaded), "helm-clusterrepo-download", h.ClusterRepoDownloadStatusHandler) + clusterRepos.OnChange(ctx, "helm-clusterrepo-download-on-change", h.ClusterRepoOnChange) } @@ -87,10 +95,13 @@ func (r *repoHandler) ClusterRepoDownloadEnsureStatusHandler(repo *catalog.Clust return r.ensure(&repo.Spec, status, &repo.ObjectMeta) } -func (r *repoHandler) ClusterRepoDownloadStatusHandler(repo *catalog.ClusterRepo, status catalog.RepoStatus) (catalog.RepoStatus, error) { +func (r *repoHandler) ClusterRepoOnChange(key string, repo *catalog.ClusterRepo) (*catalog.ClusterRepo, error) { + if repo == nil { + return nil, nil + } // Ignore OCI Based Helm Repositories if registry.IsOCI(repo.Spec.URL) { - return status, nil + return repo, nil } interval := defaultInterval @@ -98,21 +109,30 @@ func (r *repoHandler) ClusterRepoDownloadStatusHandler(repo *catalog.ClusterRepo interval = time.Duration(repo.Spec.RefreshInterval) * time.Second } - err := ensureIndexConfigMap(repo, &status, r.configMaps) + newStatus := repo.Status.DeepCopy() + retryPolicy, err := getRetryPolicy(repo) if err != nil { - return status, err + err = fmt.Errorf("failed to get retry policy: %w", err) + return setErrorCondition(repo, err, newStatus, interval, repoCondition, r.clusterRepos) } - if !shouldRefresh(&repo.Spec, &status, interval) { - r.clusterRepos.EnqueueAfter(repo.Name, interval) - return status, nil + + err = ensureIndexConfigMap(repo, newStatus, r.configMaps) + if err != nil { + err = fmt.Errorf("failed to ensure index config map: %w", err) + return setErrorCondition(repo, err, newStatus, interval, repoCondition, r.clusterRepos) + } + + if shouldSkip(repo, retryPolicy, repoCondition, interval, r.clusterRepos, newStatus) { + return repo, nil } + newStatus.ShouldNotSkip = false - return r.download(&repo.Spec, status, &repo.ObjectMeta, metav1.OwnerReference{ + return r.download(repo, newStatus, metav1.OwnerReference{ APIVersion: catalog.SchemeGroupVersion.Group + "/" + catalog.SchemeGroupVersion.Version, Kind: "ClusterRepo", Name: repo.Name, UID: repo.UID, - }) + }, interval, retryPolicy) } func toOwnerObject(namespace string, owner metav1.OwnerReference) runtime.Object { @@ -202,7 +222,6 @@ func (r *repoHandler) ensure(repoSpec *catalog.RepoSpec, status catalog.RepoStat return status, nil } - status.ObservedGeneration = metadata.Generation secret, err := catalogv2.GetSecret(r.secrets, repoSpec, metadata.Namespace) if err != nil { return status, err @@ -211,73 +230,87 @@ func (r *repoHandler) ensure(repoSpec *catalog.RepoSpec, status catalog.RepoStat return status, git.Ensure(secret, metadata.Namespace, metadata.Name, status.URL, status.Commit, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle) } -func (r *repoHandler) download(repoSpec *catalog.RepoSpec, status catalog.RepoStatus, metadata *metav1.ObjectMeta, owner metav1.OwnerReference) (catalog.RepoStatus, error) { +func (r *repoHandler) download(repository *catalog.ClusterRepo, newStatus *catalog.RepoStatus, owner metav1.OwnerReference, interval time.Duration, retryPolicy retryPolicy) (*catalog.ClusterRepo, error) { var ( index *repo.IndexFile commit string err error ) - status.ObservedGeneration = metadata.Generation + metadata := repository.ObjectMeta + repoSpec := repository.Spec - secret, err := catalogv2.GetSecret(r.secrets, repoSpec, metadata.Namespace) + secret, err := catalogv2.GetSecret(r.secrets, &repoSpec, metadata.Namespace) if err != nil { - return status, err + return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos) } downloadTime := metav1.Now() - if repoSpec.GitRepo != "" && status.IndexConfigMapName == "" { + backoff := calculateBackoff(repository, retryPolicy) + retriable := false + if repoSpec.GitRepo != "" && newStatus.IndexConfigMapName == "" { commit, err = git.Head(secret, metadata.Namespace, metadata.Name, repoSpec.GitRepo, repoSpec.GitBranch, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle) if err != nil { - return status, err + retriable = true + } else { + newStatus.URL = repoSpec.GitRepo + newStatus.Branch = repoSpec.GitBranch + index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo) } - status.URL = repoSpec.GitRepo - status.Branch = repoSpec.GitBranch - index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo) } else if repoSpec.GitRepo != "" { commit, err = git.Update(secret, metadata.Namespace, metadata.Name, repoSpec.GitRepo, repoSpec.GitBranch, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle) if err != nil { - return status, err - } - status.URL = repoSpec.GitRepo - status.Branch = repoSpec.GitBranch - if status.Commit == commit { - status.DownloadTime = downloadTime - return status, nil + retriable = true + } else { + newStatus.URL = repoSpec.GitRepo + newStatus.Branch = repoSpec.GitBranch + if newStatus.Commit == commit { + newStatus.DownloadTime = downloadTime + return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos) + } + index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo) } - index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo) } else if repoSpec.URL != "" { index, err = helmhttp.DownloadIndex(secret, repoSpec.URL, repoSpec.CABundle, repoSpec.InsecureSkipTLSverify, repoSpec.DisableSameOriginCheck) - - status.URL = repoSpec.URL - status.Branch = "" + retriable = true + newStatus.URL = repoSpec.URL + newStatus.Branch = "" } else { - return status, nil + return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos) + } + if retriable && err != nil { + newStatus.NumberOfRetries++ + if newStatus.NumberOfRetries > retryPolicy.MaxRetry { + newStatus.NumberOfRetries = 0 + newStatus.NextRetryAt = metav1.Time{} + return r.setConditionWithInterval(repository, err, newStatus, nil, interval) + } + newStatus.NextRetryAt = metav1.Time{Time: timeNow().UTC().Add(backoff)} + return r.setConditionWithInterval(repository, err, newStatus, &backoff, interval) } if err != nil || index == nil { - return status, err + return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos) } index.SortEntries() - cm, err := createOrUpdateMap(metadata.Namespace, index, owner, r.apply) if err != nil { - return status, err + return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos) } - status.IndexConfigMapName = cm.Name - status.IndexConfigMapNamespace = cm.Namespace - status.IndexConfigMapResourceVersion = cm.ResourceVersion - status.DownloadTime = downloadTime - status.Commit = commit - return status, nil + newStatus.IndexConfigMapName = cm.Name + newStatus.IndexConfigMapNamespace = cm.Namespace + newStatus.IndexConfigMapResourceVersion = cm.ResourceVersion + newStatus.DownloadTime = downloadTime + newStatus.Commit = commit + return setErrorCondition(repository, nil, newStatus, interval, repoCondition, r.clusterRepos) } func ensureIndexConfigMap(repo *catalog.ClusterRepo, status *catalog.RepoStatus, configMap corev1controllers.ConfigMapClient) error { // Charts from the clusterRepo will be unavailable if the IndexConfigMap recorded in the status does not exist. // By resetting the value of IndexConfigMapName, IndexConfigMapNamespace, IndexConfigMapResourceVersion to "", // the method shouldRefresh will return true and trigger the rebuild of the IndexConfigMap and accordingly update the status. - if repo.Spec.GitRepo != "" && status.IndexConfigMapName != "" { + if status.IndexConfigMapName != "" { _, err := configMap.Get(status.IndexConfigMapNamespace, status.IndexConfigMapName, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { @@ -297,34 +330,149 @@ func ensureIndexConfigMap(repo *catalog.ClusterRepo, status *catalog.RepoStatus, return nil } -func shouldRefresh(spec *catalog.RepoSpec, status *catalog.RepoStatus, interval time.Duration) bool { - if spec.GitRepo != "" && status.Branch != spec.GitBranch { - return true +func GenerateConfigMapName(ownerName string, index int, UID types.UID) string { + return name2.SafeConcatName(ownerName, fmt.Sprint(index), string(UID)) +} + +func GetConfigMapNamespace(namespace string) string { + if namespace == "" { + return namespaces.System } - if spec.URL != "" && spec.URL != status.URL { - return true + + return namespace +} + +// setErrorCondition is only called when error happens in the handler, and +// we need to depend on wrangler to requeue the handler +func setErrorCondition(clusterRepo *catalog.ClusterRepo, + err error, + newStatus *catalog.RepoStatus, + interval time.Duration, + cond catalog.RepoCondition, + controller catalogcontrollers.ClusterRepoController) (*catalog.ClusterRepo, error) { + var statusErr error + newStatus.NumberOfRetries = 0 + newStatus.NextRetryAt = metav1.Time{} + if err != nil { + newStatus.ShouldNotSkip = true + } + condDownloaded := condition.Cond(cond) + + if apierrors.IsConflict(err) { + err = nil + } + condDownloaded.SetError(newStatus, "", err) + + newStatus.ObservedGeneration = clusterRepo.Generation + + if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) { + condDownloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339)) + + clusterRepo.Status = *newStatus + clusterRepo, statusErr = controller.UpdateStatus(clusterRepo) + if statusErr != nil { + err = statusErr + } + if err == nil { + controller.EnqueueAfter(clusterRepo.Name, interval) + } + return clusterRepo, err + } + + if err == nil { + controller.EnqueueAfter(clusterRepo.Name, interval) } - if spec.GitRepo != "" && spec.GitRepo != status.URL { + return clusterRepo, err +} + +// shouldSkip checks certain conditions to see if the handler should be skipped. +// For information regarding the conditions, check the comments in the implementation. +func shouldSkip(clusterRepo *catalog.ClusterRepo, + policy retryPolicy, + cond catalog.RepoCondition, + interval time.Duration, + controller catalogcontrollers.ClusterRepoController, + newStatus *catalog.RepoStatus) bool { + // this is to prevent the handler from making calls when the crd is outdated. + updatedRepo, err := controller.Get(clusterRepo.Name, metav1.GetOptions{}) + if err == nil && updatedRepo.ResourceVersion != clusterRepo.ResourceVersion { return true } - if status.IndexConfigMapName == "" { + + if newStatus.IndexConfigMapName == "" { + return false + } + + if newStatus.ObservedGeneration < clusterRepo.Generation { + newStatus.NumberOfRetries = 0 + newStatus.NextRetryAt = metav1.Time{} + return false + } + + // The handler is triggered immediately after any changes, including when updating the number of retries done. + // This check is to prevent the handler from executing before the backoff time has passed + if !newStatus.NextRetryAt.IsZero() && newStatus.NextRetryAt.Time.After(timeNow().UTC()) { return true } - if spec.ForceUpdate != nil && spec.ForceUpdate.After(status.DownloadTime.Time) && spec.ForceUpdate.Time.Before(time.Now()) { + + if newStatus.ShouldNotSkip { //checks if we should skip running the handler or not + return false + } + + condDownloaded := condition.Cond(cond) + downloadedUpdateTime, _ := time.Parse(time.RFC3339, condDownloaded.GetLastUpdated(clusterRepo)) + + if (newStatus.NumberOfRetries > policy.MaxRetry || newStatus.NumberOfRetries == 0) && // checks if it's not retrying + clusterRepo.Generation == newStatus.ObservedGeneration && // checks if the generation has not changed + downloadedUpdateTime.Add(interval).After(timeNow().UTC()) { // checks if the interval has not passed + + controller.EnqueueAfter(clusterRepo.Name, interval) return true } - refreshTime := time.Now().Add(-interval) - return refreshTime.After(status.DownloadTime.Time) + return false } -func GenerateConfigMapName(ownerName string, index int, UID types.UID) string { - return name2.SafeConcatName(ownerName, fmt.Sprint(index), string(UID)) +// setConditionWithInterval is called to reenqueue the object +// after the interval of 6 hours. +func (r *repoHandler) setConditionWithInterval(clusterRepo *catalog.ClusterRepo, err error, newStatus *catalog.RepoStatus, backoff *time.Duration, interval time.Duration) (*catalog.ClusterRepo, error) { + var msg string + backoffInterval := interval.Round(time.Second) + if backoff != nil { + backoffInterval = backoff.Round(time.Second) + } + msg = fmt.Sprintf("%s. Will retry after %s", err.Error(), backoffInterval) + + return setConditionWithInterval(clusterRepo, err, newStatus, backoffInterval, repoCondition, msg, r.clusterRepos) } -func GetConfigMapNamespace(namespace string) string { - if namespace == "" { - return namespaces.System +func setConditionWithInterval(clusterRepo *catalog.ClusterRepo, + err error, + newStatus *catalog.RepoStatus, + backoff time.Duration, + cond catalog.RepoCondition, + condMessage string, + controller catalogcontrollers.ClusterRepoController, +) (*catalog.ClusterRepo, error) { + newErr := fmt.Errorf(condMessage) + downloaded := condition.Cond(cond) + + if apierrors.IsConflict(err) { + err = nil + } + downloaded.SetError(newStatus, "", newErr) + + newStatus.ObservedGeneration = clusterRepo.Generation + if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) { + //Since status has changed, update the lastUpdatedTime + downloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339)) + clusterRepo.Status = *newStatus + _, statusErr := controller.UpdateStatus(clusterRepo) + if statusErr != nil { + return clusterRepo, statusErr + } } - return namespace + controller.EnqueueAfter(clusterRepo.Name, backoff) + + return clusterRepo, nil } diff --git a/pkg/controllers/dashboard/helm/repo_oci.go b/pkg/controllers/dashboard/helm/repo_oci.go index 7c9e15518f7..912e5b2c5c5 100644 --- a/pkg/controllers/dashboard/helm/repo_oci.go +++ b/pkg/controllers/dashboard/helm/repo_oci.go @@ -21,12 +21,10 @@ import ( catalogcontrollers "github.com/rancher/rancher/pkg/generated/controllers/catalog.cattle.io/v1" corev1 "github.com/rancher/rancher/pkg/generated/norman/core/v1" "github.com/rancher/wrangler/v3/pkg/apply" - "github.com/rancher/wrangler/v3/pkg/condition" corev1controllers "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" "github.com/sirupsen/logrus" "helm.sh/helm/v3/pkg/registry" "helm.sh/helm/v3/pkg/repo" - "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,10 +32,18 @@ import ( ) var ( - timeNow = time.Now + timeNow = time.Now + defaultOCIRetryPolicy = retryPolicy{ + MinWait: 1 * time.Second, + MaxWait: 5 * time.Second, + MaxRetry: 5, + } ) -const defaultOCIInterval = 24 * time.Hour +const ( + defaultOCIInterval = 24 * time.Hour + ociCondition = catalog.OCIDownloaded +) type OCIRepohandler struct { clusterRepoController catalogcontrollers.ClusterRepoController @@ -96,9 +102,16 @@ func (o *OCIRepohandler) onClusterRepoChange(key string, clusterRepo *catalog.Cl retryPolicy, err := getRetryPolicy(clusterRepo) if err != nil { err = fmt.Errorf("failed to get retry policy: %w", err) - return o.setErrorCondition(clusterRepo, err, newStatus, ociInterval) + return setErrorCondition(clusterRepo, err, newStatus, ociInterval, ociCondition, o.clusterRepoController) + } + + err = ensureIndexConfigMap(clusterRepo, newStatus, o.configMapController) + if err != nil { + err = fmt.Errorf("failed to ensure index configmap: %w", err) + return setErrorCondition(clusterRepo, err, newStatus, ociInterval, ociCondition, o.clusterRepoController) } - if o.shouldSkip(clusterRepo, retryPolicy, key, ociInterval) { + + if shouldSkip(clusterRepo, retryPolicy, ociCondition, ociInterval, o.clusterRepoController, newStatus) { return clusterRepo, nil } newStatus.ShouldNotSkip = false @@ -106,12 +119,6 @@ func (o *OCIRepohandler) onClusterRepoChange(key string, clusterRepo *catalog.Cl logrus.Debugf("OCIRepoHandler triggered for clusterrepo %s", clusterRepo.Name) var index *repo.IndexFile - err = ensureIndexConfigMap(clusterRepo, newStatus, o.configMapController) - if err != nil { - err = fmt.Errorf("failed to ensure index configmap: %w", err) - return o.setErrorCondition(clusterRepo, err, newStatus, ociInterval) - } - secret, err := catalogv2.GetSecret(o.secretCacheController, &clusterRepo.Spec, clusterRepo.Namespace) if err != nil { logrus.Errorf("Error while fetching secret for cluster repo %s: %v", clusterRepo.Name, err) @@ -119,7 +126,7 @@ func (o *OCIRepohandler) onClusterRepoChange(key string, clusterRepo *catalog.Cl if reason != metav1.StatusReasonUnknown { err = fmt.Errorf("failed to fetch secret: %s", reason) } - return o.setErrorCondition(clusterRepo, err, newStatus, ociInterval) + return setErrorCondition(clusterRepo, err, newStatus, ociInterval, ociCondition, o.clusterRepoController) } owner := metav1.OwnerReference{ @@ -132,12 +139,12 @@ func (o *OCIRepohandler) onClusterRepoChange(key string, clusterRepo *catalog.Cl downloadTime := metav1.Now() index, err = getIndexfile(clusterRepo.Status, clusterRepo.Spec, o.configMapController, owner, clusterRepo.Namespace) if err != nil { - return o.setErrorCondition(clusterRepo, fmt.Errorf("error while getting indexfile: %w", err), newStatus, ociInterval) + return setErrorCondition(clusterRepo, fmt.Errorf("error while getting indexfile: %w", err), newStatus, ociInterval, ociCondition, o.clusterRepoController) } originalIndexBytes, err := json.Marshal(index) if err != nil { logrus.Errorf("Error while marshalling indexfile for cluster repo %s: %v", clusterRepo.Name, err) - return o.setErrorCondition(clusterRepo, fmt.Errorf("error while reading indexfile"), newStatus, ociInterval) + return setErrorCondition(clusterRepo, fmt.Errorf("error while reading indexfile"), newStatus, ociInterval, ociCondition, o.clusterRepoController) } // Create a new oci client ociClient, err := oci.NewClient(clusterRepo.Spec.URL, clusterRepo.Spec, secret) @@ -190,20 +197,20 @@ func (o *OCIRepohandler) onClusterRepoChange(key string, clusterRepo *catalog.Cl } if index == nil || len(index.Entries) <= 0 { err = errors.New("there are no helm charts in the repository specified") - return o.setErrorCondition(clusterRepo, err, newStatus, ociInterval) + return setErrorCondition(clusterRepo, err, newStatus, ociInterval, ociCondition, o.clusterRepoController) } newIndexBytes, err := json.Marshal(index) if err != nil { logrus.Errorf("Error while marshalling indexfile for cluster repo %s: %v", clusterRepo.Name, err) - return o.setErrorCondition(clusterRepo, fmt.Errorf("error while reading indexfile"), newStatus, ociInterval) + return setErrorCondition(clusterRepo, fmt.Errorf("error while reading indexfile"), newStatus, ociInterval, ociCondition, o.clusterRepoController) } // Only update, if the index got updated if !bytes.Equal(originalIndexBytes, newIndexBytes) { index.SortEntries() cm, err := createOrUpdateMap(clusterRepo.Namespace, index, owner, o.apply) if err != nil { - return o.setErrorCondition(clusterRepo, fmt.Errorf("error while creating or updating confimap"), newStatus, ociInterval) + return setErrorCondition(clusterRepo, fmt.Errorf("error while creating or updating confimap"), newStatus, ociInterval, ociCondition, o.clusterRepoController) } newStatus.URL = clusterRepo.Spec.URL @@ -213,88 +220,29 @@ func (o *OCIRepohandler) onClusterRepoChange(key string, clusterRepo *catalog.Cl newStatus.DownloadTime = downloadTime } - return o.setErrorCondition(clusterRepo, nil, newStatus, ociInterval) -} - -// setErrorCondition is only called when error happens in the handler, and -// we need to depend on wrangler to requeue the handler -func (o *OCIRepohandler) setErrorCondition(clusterRepo *catalog.ClusterRepo, err error, newStatus *catalog.RepoStatus, ociInterval time.Duration) (*catalog.ClusterRepo, error) { - var statusErr error - newStatus.NumberOfRetries = 0 - newStatus.NextRetryAt = metav1.Time{} - if err != nil { - newStatus.ShouldNotSkip = true - } - ociDownloaded := condition.Cond(catalog.OCIDownloaded) - if apierrors.IsConflict(err) { - ociDownloaded.SetError(newStatus, "", nil) - } else { - ociDownloaded.SetError(newStatus, "", err) - } - newStatus.ObservedGeneration = clusterRepo.Generation - - if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) { - ociDownloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339)) - - clusterRepo.Status = *newStatus - clusterRepo, statusErr = o.clusterRepoController.UpdateStatus(clusterRepo) - if statusErr != nil { - err = statusErr - } - if err == nil { - o.clusterRepoController.EnqueueAfter(clusterRepo.Name, ociInterval) - } - return clusterRepo, err - } - - if err == nil { - o.clusterRepoController.EnqueueAfter(clusterRepo.Name, ociInterval) - } - return clusterRepo, err + return setErrorCondition(clusterRepo, nil, newStatus, ociInterval, ociCondition, o.clusterRepoController) } // setConditionWithInterval is called to reenqueue the object // after the interval of 24 hours. func (o *OCIRepohandler) setConditionWithInterval(clusterRepo *catalog.ClusterRepo, err error, newStatus *catalog.RepoStatus, backoff *time.Duration, ociInterval time.Duration) (*catalog.ClusterRepo, error) { var errResp *errcode.ErrorResponse - var newErr error + var errorMsg string if errors.As(err, &errResp) { - errorMsg := fmt.Sprintf("error %d: %s", errResp.StatusCode, http.StatusText(errResp.StatusCode)) + errorMsg = fmt.Sprintf("error %d: %s", errResp.StatusCode, http.StatusText(errResp.StatusCode)) if backoff != nil { errorMsg = fmt.Sprintf("%s. %s", errorMsg, fmt.Sprintf("Will retry after %s", backoff.Round(time.Second))) } else { errorMsg = fmt.Sprintf("%s. %s", errorMsg, fmt.Sprintf("Will retry after %s", ociInterval.Round(time.Second))) } - newErr = fmt.Errorf(errorMsg) - } else { - newErr = err - } - ociDownloaded := condition.Cond(catalog.OCIDownloaded) - if apierrors.IsConflict(err) { - ociDownloaded.SetError(newStatus, "", nil) } else { - ociDownloaded.SetError(newStatus, "", newErr) - } - newStatus.ObservedGeneration = clusterRepo.Generation - - if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) { - // Since status has changed, update the lastUpdatedTime - ociDownloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339)) - - clusterRepo.Status = *newStatus - - _, statusErr := o.clusterRepoController.UpdateStatus(clusterRepo) - if statusErr != nil { - return clusterRepo, statusErr - } + errorMsg = err.Error() } - + backoffInterval := ociInterval.Round(time.Second) if backoff != nil { - o.clusterRepoController.EnqueueAfter(clusterRepo.Name, *backoff) - } else { - o.clusterRepoController.EnqueueAfter(clusterRepo.Name, ociInterval) + backoffInterval = backoff.Round(time.Second) } - return clusterRepo, nil + return setConditionWithInterval(clusterRepo, err, newStatus, backoffInterval, ociCondition, errorMsg, o.clusterRepoController) } // getIndexfile fetches the indexfile if it already exits for the clusterRepo @@ -399,73 +347,35 @@ func calculateBackoff(clusterRepo *catalog.ClusterRepo, policy retryPolicy) time func getRetryPolicy(clusterRepo *catalog.ClusterRepo) (retryPolicy, error) { // Default Values for exponentialBackOff function which is used // to retry an HTTP call when 429 response code is hit. - defaultRetryPolicy := retryPolicy{ - MinWait: 1 * time.Second, - MaxWait: 5 * time.Second, - MaxRetry: 5, + + policy := defaultOCIRetryPolicy + if !registry.IsOCI(clusterRepo.Spec.URL) { + policy = defaultHandlerErrRetryPolicy } if clusterRepo.Spec.ExponentialBackOffValues != nil { // Set MaxRetry if specified and valid if clusterRepo.Spec.ExponentialBackOffValues.MaxRetries > 0 { - defaultRetryPolicy.MaxRetry = clusterRepo.Spec.ExponentialBackOffValues.MaxRetries + policy.MaxRetry = clusterRepo.Spec.ExponentialBackOffValues.MaxRetries } // Set MinWait if specified and valid if clusterRepo.Spec.ExponentialBackOffValues.MinWait >= 1 { - defaultRetryPolicy.MinWait = time.Duration(clusterRepo.Spec.ExponentialBackOffValues.MinWait) * time.Second + policy.MinWait = time.Duration(clusterRepo.Spec.ExponentialBackOffValues.MinWait) * time.Second } else if clusterRepo.Spec.ExponentialBackOffValues.MinWait != 0 { - return defaultRetryPolicy, errors.New("minWait must be at least 1 second") + return policy, errors.New("minWait must be at least 1 second") } // Set MaxWait if specified and valid if clusterRepo.Spec.ExponentialBackOffValues.MaxWait > 0 { - defaultRetryPolicy.MaxWait = time.Duration(clusterRepo.Spec.ExponentialBackOffValues.MaxWait) * time.Second + policy.MaxWait = time.Duration(clusterRepo.Spec.ExponentialBackOffValues.MaxWait) * time.Second } } // Ensure MaxWait is not less than MinWait - if defaultRetryPolicy.MaxWait < defaultRetryPolicy.MinWait { - return defaultRetryPolicy, errors.New("maxWait must be greater than or equal to minWait") - } - - return defaultRetryPolicy, nil -} - -// shouldSkip checks certain conditions to see if the handler should be skipped. -// For information regarding the conditions, check the comments in the implementation. -func (o *OCIRepohandler) shouldSkip(clusterRepo *catalog.ClusterRepo, policy retryPolicy, key string, ociInterval time.Duration) bool { - // this is to prevent the handler from making calls when the crd is outdated. - updatedRepo, err := o.clusterRepoController.Get(key, metav1.GetOptions{}) - if err == nil && updatedRepo.ResourceVersion != clusterRepo.ResourceVersion { - return true - } - - if clusterRepo.Status.ObservedGeneration < clusterRepo.Generation { - clusterRepo.Status.NumberOfRetries = 0 - clusterRepo.Status.NextRetryAt = metav1.Time{} - return false + if policy.MaxWait < policy.MinWait { + return policy, errors.New("maxWait must be greater than or equal to minWait") } - // The handler is triggered immediately after any changes, including when updating the number of retries done. - // This check is to prevent the handler from executing before the backoff time has passed - if !clusterRepo.Status.NextRetryAt.IsZero() && clusterRepo.Status.NextRetryAt.Time.After(timeNow().UTC()) { - return true - } - - if clusterRepo.Status.ShouldNotSkip { //checks if we should skip running the handler or not - return false - } - - ociDownloaded := condition.Cond(catalog.OCIDownloaded) - ociDownloadedUpdateTime, _ := time.Parse(time.RFC3339, ociDownloaded.GetLastUpdated(clusterRepo)) - - if (clusterRepo.Status.NumberOfRetries > policy.MaxRetry || clusterRepo.Status.NumberOfRetries == 0) && // checks if it's not retrying - clusterRepo.Generation == clusterRepo.Status.ObservedGeneration && // checks if the generation has not changed - ociDownloadedUpdateTime.Add(ociInterval).After(timeNow().UTC()) { // checks if the interval has not passed - - o.clusterRepoController.EnqueueAfter(clusterRepo.Name, ociInterval) - return true - } - return false + return policy, nil } diff --git a/pkg/controllers/dashboard/helm/repo_oci_test.go b/pkg/controllers/dashboard/helm/repo_oci_test.go index e7bcffa4c52..3eb91d3ac84 100644 --- a/pkg/controllers/dashboard/helm/repo_oci_test.go +++ b/pkg/controllers/dashboard/helm/repo_oci_test.go @@ -348,6 +348,7 @@ func TestGetRetryPolicy(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { clusterRepo := &catalog.ClusterRepo{ Spec: catalog.RepoSpec{ + URL: "oci://dp.apps.rancher.io", ExponentialBackOffValues: testCase.backOffValues, }, } @@ -468,7 +469,7 @@ func TestShouldSkip(t *testing.T) { newClusterRepoController: func(ctrl *gomock.Controller) *fake.MockNonNamespacedControllerInterface[*catalog.ClusterRepo, *catalog.ClusterRepoList] { mockController := fake.NewMockNonNamespacedControllerInterface[*catalog.ClusterRepo, *catalog.ClusterRepoList](ctrl) mockController.EXPECT().Get("clusterRepo", metav1.GetOptions{}).Return(&catalog.ClusterRepo{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, nil) - mockController.EXPECT().EnqueueAfter("", ociInterval).Return() + mockController.EXPECT().EnqueueAfter("clusterRepo", ociInterval).Return() return mockController }, generation: 1, @@ -507,12 +508,14 @@ func TestShouldSkip(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Generation: testCase.generation, ResourceVersion: "1", + Name: "clusterRepo", }, Status: catalog.RepoStatus{ ObservedGeneration: testCase.observedGeneration, NumberOfRetries: testCase.numberOfRetries, NextRetryAt: testCase.nextRetryAt, ShouldNotSkip: testCase.shouldNotSkip, + IndexConfigMapName: "indexConfigMap", Conditions: []genericcondition.GenericCondition{ { Type: string(catalog.OCIDownloaded), @@ -520,7 +523,7 @@ func TestShouldSkip(t *testing.T) { }, }}, } - assert.Equal(t, testCase.expected, handler.shouldSkip(clusterRepo, policy, "clusterRepo", ociInterval)) + assert.Equal(t, testCase.expected, shouldSkip(clusterRepo, policy, catalog.OCIDownloaded, ociInterval, handler.clusterRepoController, &clusterRepo.Status)) }) } } diff --git a/pkg/controllers/dashboard/helm/repo_test.go b/pkg/controllers/dashboard/helm/repo_test.go deleted file mode 100644 index db38d5874ec..00000000000 --- a/pkg/controllers/dashboard/helm/repo_test.go +++ /dev/null @@ -1,211 +0,0 @@ -package helm - -import ( - "testing" - "time" - - catalog "github.com/rancher/rancher/pkg/apis/catalog.cattle.io/v1" - "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func TestShouldRefresh(t *testing.T) { - tests := []struct { - name string - spec *catalog.RepoSpec - status *catalog.RepoStatus - expectedResult bool - }{ - { - "http repo - spec equals status", - &catalog.RepoSpec{ - URL: "https://example.com", - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now(), - }, - }, - false, - }, - { - "http repo - url changed", - &catalog.RepoSpec{ - URL: "https://changed-url.com", - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now(), - }, - }, - true, - }, - { - "http repo - missing indexConfigMap", - &catalog.RepoSpec{ - URL: "https://example.com", - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "", - DownloadTime: metav1.Time{ - Time: time.Now(), - }, - }, - true, - }, - { - "http repo - download not so long ago", - &catalog.RepoSpec{ - URL: "https://example.com", - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now().Add(-1 * time.Minute), - }, - }, - false, - }, - { - "http repo - download to long ago", - &catalog.RepoSpec{ - URL: "https://example.com", - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now().Add(-7 * time.Hour), - }, - }, - true, - }, - { - "http repo - force update", - &catalog.RepoSpec{ - URL: "https://example.com", - ForceUpdate: &metav1.Time{ - Time: time.Now(), - }, - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now().Add(-1 * time.Minute), - }, - }, - true, - }, - { - "http repo - force update older than download time", - &catalog.RepoSpec{ - URL: "https://example.com", - ForceUpdate: &metav1.Time{ - Time: time.Now().Add(-2 * time.Minute), - }, - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now().Add(-1 * time.Minute), - }, - }, - false, - }, - { - "http repo - force update in the future", - &catalog.RepoSpec{ - URL: "https://example.com", - ForceUpdate: &metav1.Time{ - Time: time.Now().Add(2 * time.Minute), - }, - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now(), - }, - }, - false, - }, - { - "git repo - spec equals status", - &catalog.RepoSpec{ - GitBranch: "master", - GitRepo: "git.example.com", - }, - &catalog.RepoStatus{ - Branch: "master", - URL: "git.example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now(), - }, - }, - false, - }, - { - "git repo - branch changed", - &catalog.RepoSpec{ - GitBranch: "main", - GitRepo: "git.example.com", - }, - &catalog.RepoStatus{ - Branch: "master", - URL: "git.example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now(), - }, - }, - true, - }, - { - "git repo - repo changed", - &catalog.RepoSpec{ - GitBranch: "master", - GitRepo: "newgit.example.com", - }, - &catalog.RepoStatus{ - Branch: "master", - URL: "git.example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now(), - }, - }, - true, - }, - { - "http repo - spec equals status, but unnecessary git branch in spec", - &catalog.RepoSpec{ - URL: "https://example.com", - GitBranch: "master", - }, - &catalog.RepoStatus{ - URL: "https://example.com", - IndexConfigMapName: "configmap", - DownloadTime: metav1.Time{ - Time: time.Now(), - }, - }, - false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := shouldRefresh(tt.spec, tt.status, 6*time.Hour) - assert.Equal(t, tt.expectedResult, result) - }) - } -} diff --git a/tests/v2/integration/catalogv2/cluster_repo_test.go b/tests/v2/integration/catalogv2/cluster_repo_test.go index 55f848c375f..2dd332f579a 100644 --- a/tests/v2/integration/catalogv2/cluster_repo_test.go +++ b/tests/v2/integration/catalogv2/cluster_repo_test.go @@ -53,6 +53,8 @@ const ( LatestHTTPRepoURL = "https://releases.rancher.com/server-charts/latest" StableHTTPRepoURL = "https://releases.rancher.com/server-charts/stable" + GitClusterSmallForkName = "test-git-small-fork-cluster-repo" + GitClusterSmallForkURL = "https://github.com/rancher/charts-small-fork" GitClusterRepoName = "test-git-cluster-repo" RancherChartsGitRepoURL = "https://github.com/rancher/charts" RKE2ChartsGitRepoURL = "https://github.com/rancher/rke2-charts" @@ -137,6 +139,14 @@ func (c *ClusterRepoTestSuite) TestGitRepo() { }) } +func (c *ClusterRepoTestSuite) TestGitRepoRetries() { + c.testClusterRepoRetries(ClusterRepoParams{ + Name: GitClusterSmallForkName, + URL1: GitClusterSmallForkURL, + Type: Git, + }) +} + func StartRegistry() (*httptest.Server, error) { // Create a new registry handler handler := registryGoogle.New() @@ -751,6 +761,61 @@ func (c *ClusterRepoTestSuite) testClusterRepo(params ClusterRepoParams) { require.Error(c.T(), err) } +// testClusterRepoRetries takes in ClusterRepoParams and creates a ClusterRepo with a bad branch name, +// then updates the branch name to a valid branch name after retries are done +func (c *ClusterRepoTestSuite) testClusterRepoRetries(params ClusterRepoParams) { + // Create a ClusterRepo + cr := v1.NewClusterRepo("", params.Name, v1.ClusterRepo{}) + setClusterRepoURL(&cr.Spec, params.Type, params.URL1) + cr.Spec.InsecurePlainHTTP = params.InsecurePlainHTTP + cr.Spec.GitBranch = "invalid-branch" + expoValues := v1.ExponentialBackOffValues{ + MinWait: 2, + MaxWait: 4, + MaxRetries: 2, + } + cr.Spec.ExponentialBackOffValues = &expoValues + cr, err := c.catalogClient.ClusterRepos().Create(context.TODO(), cr, metav1.CreateOptions{}) + require.NoError(c.T(), err) + + retryNumber := 1 + err = wait.Poll(200*time.Millisecond, 30*time.Second, func() (done bool, err error) { + cr, err = c.catalogClient.ClusterRepos().Get(context.TODO(), params.Name, metav1.GetOptions{}) + assert.NoError(c.T(), err) + + for _, condition := range cr.Status.Conditions { + if v1.RepoCondition(condition.Type) == v1.RepoDownloaded { + logrus.Infof("Condition: %v, retryNumber %d, number of retries %d", condition, retryNumber, cr.Status.NumberOfRetries) + if condition.Status == corev1.ConditionFalse && cr.Status.NumberOfRetries == retryNumber { + retryNumber++ + return false, nil + } + return condition.Status == corev1.ConditionFalse && cr.Status.NumberOfRetries == 0 && retryNumber == cr.Spec.ExponentialBackOffValues.MaxRetries+1, nil + } + } + + return false, nil + }) + require.NoError(c.T(), err) + + downloadTime := cr.Status.DownloadTime + cr.Spec.GitBranch = "main" + cr, err = c.catalogClient.ClusterRepos().Update(context.TODO(), cr, metav1.UpdateOptions{}) + require.NoError(c.T(), err) + // Validate the ClusterRepo was created and resources were downloaded + clusterRepo, err := c.pollUntilDownloaded(params.Name, metav1.Time{}) + require.NoError(c.T(), err) + + status := c.getStatusFromClusterRepo(clusterRepo) + assert.Greater(c.T(), status.DownloadTime.Time, downloadTime.Time) + + err = c.catalogClient.ClusterRepos().Delete(context.TODO(), params.Name, metav1.DeleteOptions{}) + assert.NoError(c.T(), err) + + _, err = c.catalogClient.ClusterRepos().Get(context.TODO(), params.Name, metav1.GetOptions{}) + assert.Error(c.T(), err) +} + // pollUntilDownloaded Polls until the ClusterRepo of the given name has been downloaded (by comparing prevDownloadTime against the current DownloadTime) func (c *ClusterRepoTestSuite) pollUntilDownloaded(ClusterRepoName string, prevDownloadTime metav1.Time) (*stevev1.SteveAPIObject, error) { var clusterRepo *stevev1.SteveAPIObject