Skip to content

Commit

Permalink
Merge pull request rancher#46926 from diogoasouza/fix-cluster-repo-ha…
Browse files Browse the repository at this point in the history
…ndler-behavior-when-gitrancherio-is-down
  • Loading branch information
diogoasouza authored Sep 6, 2024
2 parents 6e4be82 + 7ac9151 commit 7f7897b
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 406 deletions.
268 changes: 208 additions & 60 deletions pkg/controllers/dashboard/helm/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/equality"

catalog "github.com/rancher/rancher/pkg/apis/catalog.cattle.io/v1"
"github.com/rancher/rancher/pkg/catalogv2"
"github.com/rancher/rancher/pkg/catalogv2/git"
Expand All @@ -30,9 +32,16 @@ import (

const (
maxSize = 100_000
defaultInterval = 6 * time.Hour
defaultInterval = 1 * time.Hour
repoCondition = catalog.RepoDownloaded
)

var defaultHandlerErrRetryPolicy = retryPolicy{
MinWait: 5 * time.Minute,
MaxWait: 20 * time.Minute,
MaxRetry: 3,
}

type repoHandler struct {
secrets corev1controllers.SecretCache
clusterRepos catalogcontrollers.ClusterRepoController
Expand All @@ -55,8 +64,7 @@ func RegisterRepos(ctx context.Context,
apply: apply.WithCacheTypes(configMap).WithStrictCaching().WithSetOwnerReference(false, false),
}

catalogcontrollers.RegisterClusterRepoStatusHandler(ctx, clusterRepos,
condition.Cond(catalog.RepoDownloaded), "helm-clusterrepo-download", h.ClusterRepoDownloadStatusHandler)
clusterRepos.OnChange(ctx, "helm-clusterrepo-download-on-change", h.ClusterRepoOnChange)

}

Expand Down Expand Up @@ -87,32 +95,44 @@ func (r *repoHandler) ClusterRepoDownloadEnsureStatusHandler(repo *catalog.Clust
return r.ensure(&repo.Spec, status, &repo.ObjectMeta)
}

func (r *repoHandler) ClusterRepoDownloadStatusHandler(repo *catalog.ClusterRepo, status catalog.RepoStatus) (catalog.RepoStatus, error) {
func (r *repoHandler) ClusterRepoOnChange(key string, repo *catalog.ClusterRepo) (*catalog.ClusterRepo, error) {
if repo == nil {
return nil, nil
}
// Ignore OCI Based Helm Repositories
if registry.IsOCI(repo.Spec.URL) {
return status, nil
return repo, nil
}

interval := defaultInterval
if repo.Spec.RefreshInterval > 0 {
interval = time.Duration(repo.Spec.RefreshInterval) * time.Second
}

err := ensureIndexConfigMap(repo, &status, r.configMaps)
newStatus := repo.Status.DeepCopy()
retryPolicy, err := getRetryPolicy(repo)
if err != nil {
return status, err
err = fmt.Errorf("failed to get retry policy: %w", err)
return setErrorCondition(repo, err, newStatus, interval, repoCondition, r.clusterRepos)
}
if !shouldRefresh(&repo.Spec, &status, interval) {
r.clusterRepos.EnqueueAfter(repo.Name, interval)
return status, nil

err = ensureIndexConfigMap(repo, newStatus, r.configMaps)
if err != nil {
err = fmt.Errorf("failed to ensure index config map: %w", err)
return setErrorCondition(repo, err, newStatus, interval, repoCondition, r.clusterRepos)
}

if shouldSkip(repo, retryPolicy, repoCondition, interval, r.clusterRepos, newStatus) {
return repo, nil
}
newStatus.ShouldNotSkip = false

return r.download(&repo.Spec, status, &repo.ObjectMeta, metav1.OwnerReference{
return r.download(repo, newStatus, metav1.OwnerReference{
APIVersion: catalog.SchemeGroupVersion.Group + "/" + catalog.SchemeGroupVersion.Version,
Kind: "ClusterRepo",
Name: repo.Name,
UID: repo.UID,
})
}, interval, retryPolicy)
}

func toOwnerObject(namespace string, owner metav1.OwnerReference) runtime.Object {
Expand Down Expand Up @@ -202,7 +222,6 @@ func (r *repoHandler) ensure(repoSpec *catalog.RepoSpec, status catalog.RepoStat
return status, nil
}

status.ObservedGeneration = metadata.Generation
secret, err := catalogv2.GetSecret(r.secrets, repoSpec, metadata.Namespace)
if err != nil {
return status, err
Expand All @@ -211,73 +230,87 @@ func (r *repoHandler) ensure(repoSpec *catalog.RepoSpec, status catalog.RepoStat
return status, git.Ensure(secret, metadata.Namespace, metadata.Name, status.URL, status.Commit, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle)
}

func (r *repoHandler) download(repoSpec *catalog.RepoSpec, status catalog.RepoStatus, metadata *metav1.ObjectMeta, owner metav1.OwnerReference) (catalog.RepoStatus, error) {
func (r *repoHandler) download(repository *catalog.ClusterRepo, newStatus *catalog.RepoStatus, owner metav1.OwnerReference, interval time.Duration, retryPolicy retryPolicy) (*catalog.ClusterRepo, error) {
var (
index *repo.IndexFile
commit string
err error
)

status.ObservedGeneration = metadata.Generation
metadata := repository.ObjectMeta
repoSpec := repository.Spec

secret, err := catalogv2.GetSecret(r.secrets, repoSpec, metadata.Namespace)
secret, err := catalogv2.GetSecret(r.secrets, &repoSpec, metadata.Namespace)
if err != nil {
return status, err
return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos)
}

downloadTime := metav1.Now()
if repoSpec.GitRepo != "" && status.IndexConfigMapName == "" {
backoff := calculateBackoff(repository, retryPolicy)
retriable := false
if repoSpec.GitRepo != "" && newStatus.IndexConfigMapName == "" {
commit, err = git.Head(secret, metadata.Namespace, metadata.Name, repoSpec.GitRepo, repoSpec.GitBranch, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle)
if err != nil {
return status, err
retriable = true
} else {
newStatus.URL = repoSpec.GitRepo
newStatus.Branch = repoSpec.GitBranch
index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo)
}
status.URL = repoSpec.GitRepo
status.Branch = repoSpec.GitBranch
index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo)
} else if repoSpec.GitRepo != "" {
commit, err = git.Update(secret, metadata.Namespace, metadata.Name, repoSpec.GitRepo, repoSpec.GitBranch, repoSpec.InsecureSkipTLSverify, repoSpec.CABundle)
if err != nil {
return status, err
}
status.URL = repoSpec.GitRepo
status.Branch = repoSpec.GitBranch
if status.Commit == commit {
status.DownloadTime = downloadTime
return status, nil
retriable = true
} else {
newStatus.URL = repoSpec.GitRepo
newStatus.Branch = repoSpec.GitBranch
if newStatus.Commit == commit {
newStatus.DownloadTime = downloadTime
return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos)
}
index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo)
}
index, err = git.BuildOrGetIndex(metadata.Namespace, metadata.Name, repoSpec.GitRepo)
} else if repoSpec.URL != "" {
index, err = helmhttp.DownloadIndex(secret, repoSpec.URL, repoSpec.CABundle, repoSpec.InsecureSkipTLSverify, repoSpec.DisableSameOriginCheck)

status.URL = repoSpec.URL
status.Branch = ""
retriable = true
newStatus.URL = repoSpec.URL
newStatus.Branch = ""
} else {
return status, nil
return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos)
}
if retriable && err != nil {
newStatus.NumberOfRetries++
if newStatus.NumberOfRetries > retryPolicy.MaxRetry {
newStatus.NumberOfRetries = 0
newStatus.NextRetryAt = metav1.Time{}
return r.setConditionWithInterval(repository, err, newStatus, nil, interval)
}
newStatus.NextRetryAt = metav1.Time{Time: timeNow().UTC().Add(backoff)}
return r.setConditionWithInterval(repository, err, newStatus, &backoff, interval)
}
if err != nil || index == nil {
return status, err
return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos)
}

index.SortEntries()

cm, err := createOrUpdateMap(metadata.Namespace, index, owner, r.apply)
if err != nil {
return status, err
return setErrorCondition(repository, err, newStatus, interval, repoCondition, r.clusterRepos)
}

status.IndexConfigMapName = cm.Name
status.IndexConfigMapNamespace = cm.Namespace
status.IndexConfigMapResourceVersion = cm.ResourceVersion
status.DownloadTime = downloadTime
status.Commit = commit
return status, nil
newStatus.IndexConfigMapName = cm.Name
newStatus.IndexConfigMapNamespace = cm.Namespace
newStatus.IndexConfigMapResourceVersion = cm.ResourceVersion
newStatus.DownloadTime = downloadTime
newStatus.Commit = commit
return setErrorCondition(repository, nil, newStatus, interval, repoCondition, r.clusterRepos)
}

func ensureIndexConfigMap(repo *catalog.ClusterRepo, status *catalog.RepoStatus, configMap corev1controllers.ConfigMapClient) error {
// Charts from the clusterRepo will be unavailable if the IndexConfigMap recorded in the status does not exist.
// By resetting the value of IndexConfigMapName, IndexConfigMapNamespace, IndexConfigMapResourceVersion to "",
// the method shouldRefresh will return true and trigger the rebuild of the IndexConfigMap and accordingly update the status.
if repo.Spec.GitRepo != "" && status.IndexConfigMapName != "" {
if status.IndexConfigMapName != "" {
_, err := configMap.Get(status.IndexConfigMapNamespace, status.IndexConfigMapName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -297,34 +330,149 @@ func ensureIndexConfigMap(repo *catalog.ClusterRepo, status *catalog.RepoStatus,
return nil
}

func shouldRefresh(spec *catalog.RepoSpec, status *catalog.RepoStatus, interval time.Duration) bool {
if spec.GitRepo != "" && status.Branch != spec.GitBranch {
return true
func GenerateConfigMapName(ownerName string, index int, UID types.UID) string {
return name2.SafeConcatName(ownerName, fmt.Sprint(index), string(UID))
}

func GetConfigMapNamespace(namespace string) string {
if namespace == "" {
return namespaces.System
}
if spec.URL != "" && spec.URL != status.URL {
return true

return namespace
}

// setErrorCondition is only called when error happens in the handler, and
// we need to depend on wrangler to requeue the handler
func setErrorCondition(clusterRepo *catalog.ClusterRepo,
err error,
newStatus *catalog.RepoStatus,
interval time.Duration,
cond catalog.RepoCondition,
controller catalogcontrollers.ClusterRepoController) (*catalog.ClusterRepo, error) {
var statusErr error
newStatus.NumberOfRetries = 0
newStatus.NextRetryAt = metav1.Time{}
if err != nil {
newStatus.ShouldNotSkip = true
}
condDownloaded := condition.Cond(cond)

if apierrors.IsConflict(err) {
err = nil
}
condDownloaded.SetError(newStatus, "", err)

newStatus.ObservedGeneration = clusterRepo.Generation

if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) {
condDownloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339))

clusterRepo.Status = *newStatus
clusterRepo, statusErr = controller.UpdateStatus(clusterRepo)
if statusErr != nil {
err = statusErr
}
if err == nil {
controller.EnqueueAfter(clusterRepo.Name, interval)
}
return clusterRepo, err
}

if err == nil {
controller.EnqueueAfter(clusterRepo.Name, interval)
}
if spec.GitRepo != "" && spec.GitRepo != status.URL {
return clusterRepo, err
}

// shouldSkip checks certain conditions to see if the handler should be skipped.
// For information regarding the conditions, check the comments in the implementation.
func shouldSkip(clusterRepo *catalog.ClusterRepo,
policy retryPolicy,
cond catalog.RepoCondition,
interval time.Duration,
controller catalogcontrollers.ClusterRepoController,
newStatus *catalog.RepoStatus) bool {
// this is to prevent the handler from making calls when the crd is outdated.
updatedRepo, err := controller.Get(clusterRepo.Name, metav1.GetOptions{})
if err == nil && updatedRepo.ResourceVersion != clusterRepo.ResourceVersion {
return true
}
if status.IndexConfigMapName == "" {

if newStatus.IndexConfigMapName == "" {
return false
}

if newStatus.ObservedGeneration < clusterRepo.Generation {
newStatus.NumberOfRetries = 0
newStatus.NextRetryAt = metav1.Time{}
return false
}

// The handler is triggered immediately after any changes, including when updating the number of retries done.
// This check is to prevent the handler from executing before the backoff time has passed
if !newStatus.NextRetryAt.IsZero() && newStatus.NextRetryAt.Time.After(timeNow().UTC()) {
return true
}
if spec.ForceUpdate != nil && spec.ForceUpdate.After(status.DownloadTime.Time) && spec.ForceUpdate.Time.Before(time.Now()) {

if newStatus.ShouldNotSkip { //checks if we should skip running the handler or not
return false
}

condDownloaded := condition.Cond(cond)
downloadedUpdateTime, _ := time.Parse(time.RFC3339, condDownloaded.GetLastUpdated(clusterRepo))

if (newStatus.NumberOfRetries > policy.MaxRetry || newStatus.NumberOfRetries == 0) && // checks if it's not retrying
clusterRepo.Generation == newStatus.ObservedGeneration && // checks if the generation has not changed
downloadedUpdateTime.Add(interval).After(timeNow().UTC()) { // checks if the interval has not passed

controller.EnqueueAfter(clusterRepo.Name, interval)
return true
}
refreshTime := time.Now().Add(-interval)
return refreshTime.After(status.DownloadTime.Time)
return false
}

func GenerateConfigMapName(ownerName string, index int, UID types.UID) string {
return name2.SafeConcatName(ownerName, fmt.Sprint(index), string(UID))
// setConditionWithInterval is called to reenqueue the object
// after the interval of 6 hours.
func (r *repoHandler) setConditionWithInterval(clusterRepo *catalog.ClusterRepo, err error, newStatus *catalog.RepoStatus, backoff *time.Duration, interval time.Duration) (*catalog.ClusterRepo, error) {
var msg string
backoffInterval := interval.Round(time.Second)
if backoff != nil {
backoffInterval = backoff.Round(time.Second)
}
msg = fmt.Sprintf("%s. Will retry after %s", err.Error(), backoffInterval)

return setConditionWithInterval(clusterRepo, err, newStatus, backoffInterval, repoCondition, msg, r.clusterRepos)
}

func GetConfigMapNamespace(namespace string) string {
if namespace == "" {
return namespaces.System
func setConditionWithInterval(clusterRepo *catalog.ClusterRepo,
err error,
newStatus *catalog.RepoStatus,
backoff time.Duration,
cond catalog.RepoCondition,
condMessage string,
controller catalogcontrollers.ClusterRepoController,
) (*catalog.ClusterRepo, error) {
newErr := fmt.Errorf(condMessage)
downloaded := condition.Cond(cond)

if apierrors.IsConflict(err) {
err = nil
}
downloaded.SetError(newStatus, "", newErr)

newStatus.ObservedGeneration = clusterRepo.Generation
if !equality.Semantic.DeepEqual(newStatus, &clusterRepo.Status) {
//Since status has changed, update the lastUpdatedTime
downloaded.LastUpdated(newStatus, timeNow().UTC().Format(time.RFC3339))
clusterRepo.Status = *newStatus
_, statusErr := controller.UpdateStatus(clusterRepo)
if statusErr != nil {
return clusterRepo, statusErr
}
}

return namespace
controller.EnqueueAfter(clusterRepo.Name, backoff)

return clusterRepo, nil
}
Loading

0 comments on commit 7f7897b

Please sign in to comment.