diff --git a/controllers/realtime_calculator.go b/controllers/realtime_calculator.go index 517945ff8..6f20e20f1 100644 --- a/controllers/realtime_calculator.go +++ b/controllers/realtime_calculator.go @@ -16,32 +16,33 @@ type topicLast struct { } type realtimeCalculator struct { - rsk *tipocav1.RedshiftSink - watcher kafka.Watcher - topicGroups map[string]tipocav1.Group - cache *sync.Map + rsk *tipocav1.RedshiftSink + watcher kafka.Watcher + cache *sync.Map batchersRealtime []string loadersRealtime []string batchersLast []topicLast loadersLast []topicLast + + desiredVersion string } func newRealtimeCalculator( rsk *tipocav1.RedshiftSink, watcher kafka.Watcher, - topicGroups map[string]tipocav1.Group, cache *sync.Map, + desiredVersion string, ) *realtimeCalculator { return &realtimeCalculator{ - rsk: rsk, - watcher: watcher, - topicGroups: topicGroups, - cache: cache, - batchersLast: []topicLast{}, - loadersLast: []topicLast{}, + rsk: rsk, + watcher: watcher, + cache: cache, + batchersLast: []topicLast{}, + loadersLast: []topicLast{}, + desiredVersion: desiredVersion, } } @@ -125,7 +126,8 @@ type topicRealtimeInfo struct { func (r *realtimeCalculator) fetchRealtimeInfo( topic string, loaderTopic *string, - group tipocav1.Group, + desiredGroupID string, + groupLoaderCurrentOffset *int64, ) ( topicRealtimeInfo, error, ) { @@ -150,7 +152,7 @@ func (r *realtimeCalculator) fetchRealtimeInfo( // batcher's lag analysis: b) get current batcherCurrent, err := r.watcher.CurrentOffset( - consumerGroupID(r.rsk.Name, r.rsk.Namespace, group.ID, "-batcher"), + consumerGroupID(r.rsk.Name, r.rsk.Namespace, desiredGroupID, "-batcher"), topic, 0, ) @@ -180,7 +182,7 @@ func (r *realtimeCalculator) fetchRealtimeInfo( // loader's lag analysis: b) get current loaderCurrent, err := r.watcher.CurrentOffset( - consumerGroupID(r.rsk.Name, r.rsk.Namespace, group.ID, "-loader"), + consumerGroupID(r.rsk.Name, r.rsk.Namespace, desiredGroupID, "-loader"), *loaderTopic, 0, ) @@ -193,18 +195,16 @@ func (r *realtimeCalculator) fetchRealtimeInfo( // 1. When the Consumer Group was never created in that case we return and consider the topic not realtime // 2. When the Consumer Group had processed before but now is showing -1 currentOffset as it is inactive due to less throughput. // On such a scenario, we consider it realtime. We find this case by saving the currentOffset for the loader topcics in RedshiftSinkStatus.TopicGroup - if group.LoaderCurrentOffset == nil { + if groupLoaderCurrentOffset == nil { klog.V(2).Infof("%s, loader cg 404, not realtime", *loaderTopic) return info, nil } - klog.V(4).Infof("rsk/%s %s, currentOffset=%v (old), cg 404, try realtime", r.rsk.Name, *loaderTopic, *group.LoaderCurrentOffset) + klog.V(4).Infof("rsk/%s %s, currentOffset=%v (old), cg 404, try realtime", r.rsk.Name, *loaderTopic, *groupLoaderCurrentOffset) // give the topic the opportunity to release based on its last found currentOffset - info.loader.current = group.LoaderCurrentOffset + info.loader.current = groupLoaderCurrentOffset } else { - group.LoaderCurrentOffset = &loaderCurrent // updates the new queried loader offset klog.V(4).Infof("rsk/%s %s, cg found", r.rsk.Name, *loaderTopic) - updateTopicGroup(r.rsk, topic, group) info.loader.current = &loaderCurrent } @@ -231,15 +231,22 @@ func (r *realtimeCalculator) calculate(reloading []string, currentRealtime []str allTopicsMap := toMap(allTopics) for _, topic := range reloading { - group, ok := r.topicGroups[topic] - if !ok { - klog.Errorf("topicGroup 404 in status for: %s", topic) - continue + // extract or compute consumer group info + var currentGroupID string + var loaderCurrentOffset *int64 + desiredGroupID := groupIDFromVersion(r.desiredVersion) + + group := topicGroup(r.rsk, topic) + if group != nil { // not the first release + currentGroupID = group.ID + loaderCurrentOffset = group.LoaderCurrentOffset + } else { // first release + currentGroupID = desiredGroupID } var loaderTopic *string - ltopic := r.rsk.Spec.KafkaLoaderTopicPrefix + group.ID + "-" + topic - _, ok = allTopicsMap[ltopic] + ltopic := r.rsk.Spec.KafkaLoaderTopicPrefix + desiredGroupID + "-" + topic + _, ok := allTopicsMap[ltopic] if !ok { klog.V(2).Infof("%s topic 404, not realtime.", ltopic) } else { @@ -250,7 +257,7 @@ func (r *realtimeCalculator) calculate(reloading []string, currentRealtime []str info, hit := r.fetchRealtimeCache(topic) if !hit { // fetch again, cache miss - info, err = r.fetchRealtimeInfo(topic, loaderTopic, group) + info, err = r.fetchRealtimeInfo(topic, loaderTopic, desiredGroupID, loaderCurrentOffset) if err != nil { klog.Errorf( "rsk/%s Error fetching realtime info for topic: %s, err: %v", @@ -308,6 +315,12 @@ func (r *realtimeCalculator) calculate(reloading []string, currentRealtime []str info.loaderRealtime = true r.loadersRealtime = append(r.loadersRealtime, ltopic) } + // this is for updating the LoaderCurrentOffset + updateTopicGroup(r.rsk, topic, tipocav1.Group{ + LoaderCurrentOffset: info.loader.current, + LoaderTopicPrefix: loaderPrefixFromGroupID(r.rsk.Spec.KafkaLoaderTopicPrefix, currentGroupID), + ID: currentGroupID, + }) } r.loadersLast = append( r.loadersLast, diff --git a/controllers/redshiftsink_controller.go b/controllers/redshiftsink_controller.go index 844788e0f..fd7438716 100644 --- a/controllers/redshiftsink_controller.go +++ b/controllers/redshiftsink_controller.go @@ -418,8 +418,7 @@ func (r *RedshiftSinkReconciler) reconcile( // Realtime status is always calculated to keep the CurrentOffset // info updated in the rsk status. This is required so that low throughput // release do not get blocked due to missing consumer group currentOffset. - reloadTopicGroup := topicGroupBySinkGroup(rsk, ReloadSinkGroup, status.reloading, status.desiredVersion, rsk.Spec.KafkaLoaderTopicPrefix) - calc := newRealtimeCalculator(rsk, kafkaWatcher, reloadTopicGroup, r.KafkaRealtimeCache) + calc := newRealtimeCalculator(rsk, kafkaWatcher, r.KafkaRealtimeCache, desiredMaskVersion) currentRealtime := calc.calculate(status.reloading, status.realtime) if len(status.reloading) > 0 { klog.V(2).Infof("rsk/%v batchersRealtime: %d / %d (current=%d)", rsk.Name, len(calc.batchersRealtime), len(status.reloading), len(rsk.Status.BatcherReloadingTopics))