diff --git a/controllers/redshiftsink_controller.go b/controllers/redshiftsink_controller.go index 398b089ee..480508649 100644 --- a/controllers/redshiftsink_controller.go +++ b/controllers/redshiftsink_controller.go @@ -66,6 +66,11 @@ type RedshiftSinkReconciler struct { DefaultRedshiftMaxOpenConns int } +const ( + MaxConcurrentReloading = 30 + MaxTopicRelease = 50 +) + // +kubebuilder:rbac:groups=tipoca.k8s.practo.dev,resources=redshiftsinks,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=tipoca.k8s.practo.dev,resources=redshiftsinks/status,verbs=get;update;patch // +kubebuilder:rbac:groups=*,resources=deployments,verbs=get;list;watch;create;update;patch;delete @@ -179,7 +184,6 @@ func (r *RedshiftSinkReconciler) fetchLatestTopics( []string, error, ) { - var topics []string var err error var rgx *regexp.Regexp topicsAppended := make(map[string]bool) @@ -187,15 +191,16 @@ func (r *RedshiftSinkReconciler) fetchLatestTopics( allTopics, err := kafkaWatcher.Topics() if err != nil { - return topics, err + return []string{}, err } + var topics []string for _, expression := range expressions { rgxLoaded, ok := r.KafkaTopicRegexes.Load(expression) if !ok { rgx, err = regexp.Compile(strings.TrimSpace(expression)) if err != nil { - return topics, fmt.Errorf( + return []string{}, fmt.Errorf( "Compling regex: %s failed, err:%v\n", expression, err) } r.KafkaTopicRegexes.Store(expression, rgx) @@ -216,6 +221,8 @@ func (r *RedshiftSinkReconciler) fetchLatestTopics( } } + sortStringSlice(topics) + return topics, nil } @@ -410,10 +417,14 @@ func (r *RedshiftSinkReconciler) reconcile( // tableSuffix: "" var reload, reloadDupe, main *sinkGroup + allowedReloadingTopics := status.reloading + if len(status.reloading) > MaxConcurrentReloading { + allowedReloadingTopics = status.reloading[:MaxConcurrentReloading] + } reload = sgBuilder. setRedshiftSink(rsk).setClient(r.Client).setScheme(r.Scheme). setType(ReloadSinkGroup). - setTopics(status.reloading). + setTopics(allowedReloadingTopics). setMaskVersion(status.desiredVersion). setTopicGroups(). buildBatcher(secret, r.DefaultBatcherImage, r.DefaultKafkaVersion, tlsConfig). @@ -501,14 +512,13 @@ func (r *RedshiftSinkReconciler) reconcile( return result, nil, nil } - // release the realtime topics, topics in realtime (maxTopicRelease) are + // release the realtime topics, topics in realtime (MaxTopicRelease) are // taken as a group and is tried to release in single reconcile // to reduce the time spent on rebalance of sink groups (optimization) // #141 - maxTopicRelease := 50 releaseCandidates := status.realtime - if len(status.realtime) >= maxTopicRelease { - releaseCandidates = status.realtime[:maxTopicRelease] + if len(status.realtime) >= MaxTopicRelease { + releaseCandidates = status.realtime[:MaxTopicRelease] } klog.V(2).Infof("release candidates: %v", releaseCandidates) diff --git a/controllers/sinkgroup_controller.go b/controllers/sinkgroup_controller.go index 6ebedc4d6..b63baa8f2 100644 --- a/controllers/sinkgroup_controller.go +++ b/controllers/sinkgroup_controller.go @@ -618,8 +618,8 @@ func (s *sinkGroup) topicRealtime( cacheLoaded, ok := cache.Load(topic) if ok { realtimeCache = cacheLoaded.(kafkaRealtimeCache) - // 300 to 240 seconds - validitySeconds := rand.Intn(120) + 300 + // 600 to 840 seconds + validitySeconds := rand.Intn(240) + 300 klog.V(5).Infof("rsk/%s validity seconds: %v topic: %s", s.rsk.Name, validitySeconds, topic) if cacheValid(time.Second*time.Duration(validitySeconds), realtimeCache.lastCacheRefresh) { klog.V(4).Infof("rsk/%s (realtime cache hit) topic: %s", s.rsk.Name, topic) diff --git a/controllers/status.go b/controllers/status.go index fb88c3308..dd54c353f 100644 --- a/controllers/status.go +++ b/controllers/status.go @@ -301,6 +301,9 @@ func (s *status) info() { klog.V(2).Infof("%s reloading: %d %v", rskName, len(s.reloading), s.reloading) klog.V(2).Infof("%s rDupe: %d %v", rskName, len(s.reloadingDupe), s.reloadingDupe) klog.V(2).Infof("%s realtime: %d %v", rskName, len(s.realtime), s.realtime) + if len(s.reloading) > MaxConcurrentReloading { + klog.V(2).Infof("%s reloadingC: %d %v", rskName, MaxConcurrentReloading, s.reloading[:MaxConcurrentReloading]) + } } // manyReloading checks the percentage of reloading topics of the total topics diff --git a/controllers/util.go b/controllers/util.go index 39a58f648..6c5a4acf6 100644 --- a/controllers/util.go +++ b/controllers/util.go @@ -139,6 +139,10 @@ func getHashStructure(v interface{}) (string, error) { return hash[:6], nil } +func sortStringSlice(t []string) { + sort.Sort(sort.StringSlice(t)) +} + // getDefaultLabels gives back the default labels for the crd resources func getDefaultLabels( instance, sinkGroup, objectName string, @@ -189,7 +193,7 @@ func expandTopicsToRegex(topics []string) string { if len(topics) == 0 { return "" } - sort.Strings(topics) + sortStringSlice(topics) fullMatchRegex := "" for _, topic := range topics {