Skip to content

Commit

Permalink
Merge pull request #168 from practo/max-concurrent-reloading
Browse files Browse the repository at this point in the history
Max concurrent reloading allowed
  • Loading branch information
alok87 authored Mar 18, 2021
2 parents ce15ebd + 844b73e commit 44680f1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
26 changes: 18 additions & 8 deletions controllers/redshiftsink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ type RedshiftSinkReconciler struct {
DefaultRedshiftMaxOpenConns int
}

const (
MaxConcurrentReloading = 30
MaxTopicRelease = 50
)

// +kubebuilder:rbac:groups=tipoca.k8s.practo.dev,resources=redshiftsinks,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tipoca.k8s.practo.dev,resources=redshiftsinks/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=*,resources=deployments,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -179,23 +184,23 @@ func (r *RedshiftSinkReconciler) fetchLatestTopics(
[]string,
error,
) {
var topics []string
var err error
var rgx *regexp.Regexp
topicsAppended := make(map[string]bool)
expressions := strings.Split(regexes, ",")

allTopics, err := kafkaWatcher.Topics()
if err != nil {
return topics, err
return []string{}, err
}

var topics []string
for _, expression := range expressions {
rgxLoaded, ok := r.KafkaTopicRegexes.Load(expression)
if !ok {
rgx, err = regexp.Compile(strings.TrimSpace(expression))
if err != nil {
return topics, fmt.Errorf(
return []string{}, fmt.Errorf(
"Compling regex: %s failed, err:%v\n", expression, err)
}
r.KafkaTopicRegexes.Store(expression, rgx)
Expand All @@ -216,6 +221,8 @@ func (r *RedshiftSinkReconciler) fetchLatestTopics(
}
}

sortStringSlice(topics)

return topics, nil
}

Expand Down Expand Up @@ -410,10 +417,14 @@ func (r *RedshiftSinkReconciler) reconcile(
// tableSuffix: ""
var reload, reloadDupe, main *sinkGroup

allowedReloadingTopics := status.reloading
if len(status.reloading) > MaxConcurrentReloading {
allowedReloadingTopics = status.reloading[:MaxConcurrentReloading]
}
reload = sgBuilder.
setRedshiftSink(rsk).setClient(r.Client).setScheme(r.Scheme).
setType(ReloadSinkGroup).
setTopics(status.reloading).
setTopics(allowedReloadingTopics).
setMaskVersion(status.desiredVersion).
setTopicGroups().
buildBatcher(secret, r.DefaultBatcherImage, r.DefaultKafkaVersion, tlsConfig).
Expand Down Expand Up @@ -501,14 +512,13 @@ func (r *RedshiftSinkReconciler) reconcile(
return result, nil, nil
}

// release the realtime topics, topics in realtime (maxTopicRelease) are
// release the realtime topics, topics in realtime (MaxTopicRelease) are
// taken as a group and is tried to release in single reconcile
// to reduce the time spent on rebalance of sink groups (optimization)
// #141
maxTopicRelease := 50
releaseCandidates := status.realtime
if len(status.realtime) >= maxTopicRelease {
releaseCandidates = status.realtime[:maxTopicRelease]
if len(status.realtime) >= MaxTopicRelease {
releaseCandidates = status.realtime[:MaxTopicRelease]
}
klog.V(2).Infof("release candidates: %v", releaseCandidates)

Expand Down
4 changes: 2 additions & 2 deletions controllers/sinkgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,8 @@ func (s *sinkGroup) topicRealtime(
cacheLoaded, ok := cache.Load(topic)
if ok {
realtimeCache = cacheLoaded.(kafkaRealtimeCache)
// 300 to 240 seconds
validitySeconds := rand.Intn(120) + 300
// 600 to 840 seconds
validitySeconds := rand.Intn(240) + 300
klog.V(5).Infof("rsk/%s validity seconds: %v topic: %s", s.rsk.Name, validitySeconds, topic)
if cacheValid(time.Second*time.Duration(validitySeconds), realtimeCache.lastCacheRefresh) {
klog.V(4).Infof("rsk/%s (realtime cache hit) topic: %s", s.rsk.Name, topic)
Expand Down
3 changes: 3 additions & 0 deletions controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ func (s *status) info() {
klog.V(2).Infof("%s reloading: %d %v", rskName, len(s.reloading), s.reloading)
klog.V(2).Infof("%s rDupe: %d %v", rskName, len(s.reloadingDupe), s.reloadingDupe)
klog.V(2).Infof("%s realtime: %d %v", rskName, len(s.realtime), s.realtime)
if len(s.reloading) > MaxConcurrentReloading {
klog.V(2).Infof("%s reloadingC: %d %v", rskName, MaxConcurrentReloading, s.reloading[:MaxConcurrentReloading])
}
}

// manyReloading checks the percentage of reloading topics of the total topics
Expand Down
6 changes: 5 additions & 1 deletion controllers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func getHashStructure(v interface{}) (string, error) {
return hash[:6], nil
}

func sortStringSlice(t []string) {
sort.Sort(sort.StringSlice(t))
}

// getDefaultLabels gives back the default labels for the crd resources
func getDefaultLabels(
instance, sinkGroup, objectName string,
Expand Down Expand Up @@ -189,7 +193,7 @@ func expandTopicsToRegex(topics []string) string {
if len(topics) == 0 {
return ""
}
sort.Strings(topics)
sortStringSlice(topics)

fullMatchRegex := ""
for _, topic := range topics {
Expand Down

0 comments on commit 44680f1

Please sign in to comment.