Skip to content

Commit

Permalink
Set right ConsumerGroup version for ReloadDupe
Browse files Browse the repository at this point in the history
Fixes #190
  • Loading branch information
alok87 committed Apr 9, 2021
1 parent fcc4dd7 commit bf31952
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 28 deletions.
65 changes: 39 additions & 26 deletions controllers/realtime_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,33 @@ type topicLast struct {
}

type realtimeCalculator struct {
rsk *tipocav1.RedshiftSink
watcher kafka.Watcher
topicGroups map[string]tipocav1.Group
cache *sync.Map
rsk *tipocav1.RedshiftSink
watcher kafka.Watcher
cache *sync.Map

batchersRealtime []string
loadersRealtime []string

batchersLast []topicLast
loadersLast []topicLast

desiredVersion string
}

func newRealtimeCalculator(
rsk *tipocav1.RedshiftSink,
watcher kafka.Watcher,
topicGroups map[string]tipocav1.Group,
cache *sync.Map,
desiredVersion string,
) *realtimeCalculator {

return &realtimeCalculator{
rsk: rsk,
watcher: watcher,
topicGroups: topicGroups,
cache: cache,
batchersLast: []topicLast{},
loadersLast: []topicLast{},
rsk: rsk,
watcher: watcher,
cache: cache,
batchersLast: []topicLast{},
loadersLast: []topicLast{},
desiredVersion: desiredVersion,
}
}

Expand Down Expand Up @@ -125,7 +126,8 @@ type topicRealtimeInfo struct {
func (r *realtimeCalculator) fetchRealtimeInfo(
topic string,
loaderTopic *string,
group tipocav1.Group,
desiredGroupID string,
groupLoaderCurrentOffset *int64,
) (
topicRealtimeInfo, error,
) {
Expand All @@ -150,7 +152,7 @@ func (r *realtimeCalculator) fetchRealtimeInfo(

// batcher's lag analysis: b) get current
batcherCurrent, err := r.watcher.CurrentOffset(
consumerGroupID(r.rsk.Name, r.rsk.Namespace, group.ID, "-batcher"),
consumerGroupID(r.rsk.Name, r.rsk.Namespace, desiredGroupID, "-batcher"),
topic,
0,
)
Expand Down Expand Up @@ -180,7 +182,7 @@ func (r *realtimeCalculator) fetchRealtimeInfo(

// loader's lag analysis: b) get current
loaderCurrent, err := r.watcher.CurrentOffset(
consumerGroupID(r.rsk.Name, r.rsk.Namespace, group.ID, "-loader"),
consumerGroupID(r.rsk.Name, r.rsk.Namespace, desiredGroupID, "-loader"),
*loaderTopic,
0,
)
Expand All @@ -193,18 +195,16 @@ func (r *realtimeCalculator) fetchRealtimeInfo(
// 1. When the Consumer Group was never created in that case we return and consider the topic not realtime
// 2. When the Consumer Group had processed before but now is showing -1 currentOffset as it is inactive due to less throughput.
// On such a scenario, we consider it realtime. We find this case by saving the currentOffset for the loader topcics in RedshiftSinkStatus.TopicGroup
if group.LoaderCurrentOffset == nil {
if groupLoaderCurrentOffset == nil {
klog.V(2).Infof("%s, loader cg 404, not realtime", *loaderTopic)
return info, nil
}
klog.V(4).Infof("rsk/%s %s, currentOffset=%v (old), cg 404, try realtime", r.rsk.Name, *loaderTopic, *group.LoaderCurrentOffset)
klog.V(4).Infof("rsk/%s %s, currentOffset=%v (old), cg 404, try realtime", r.rsk.Name, *loaderTopic, *groupLoaderCurrentOffset)
// give the topic the opportunity to release based on its last found currentOffset
info.loader.current = group.LoaderCurrentOffset
info.loader.current = groupLoaderCurrentOffset
} else {
group.LoaderCurrentOffset = &loaderCurrent
// updates the new queried loader offset
klog.V(4).Infof("rsk/%s %s, cg found", r.rsk.Name, *loaderTopic)
updateTopicGroup(r.rsk, topic, group)
info.loader.current = &loaderCurrent
}

Expand All @@ -231,15 +231,22 @@ func (r *realtimeCalculator) calculate(reloading []string, currentRealtime []str
allTopicsMap := toMap(allTopics)

for _, topic := range reloading {
group, ok := r.topicGroups[topic]
if !ok {
klog.Errorf("topicGroup 404 in status for: %s", topic)
continue
// extract or compute consumer group info
var currentGroupID string
var loaderCurrentOffset *int64
desiredGroupID := groupIDFromVersion(r.desiredVersion)

group := topicGroup(r.rsk, topic)
if group != nil { // not the first release
currentGroupID = group.ID
loaderCurrentOffset = group.LoaderCurrentOffset
} else { // first release
currentGroupID = desiredGroupID
}

var loaderTopic *string
ltopic := r.rsk.Spec.KafkaLoaderTopicPrefix + group.ID + "-" + topic
_, ok = allTopicsMap[ltopic]
ltopic := r.rsk.Spec.KafkaLoaderTopicPrefix + desiredGroupID + "-" + topic
_, ok := allTopicsMap[ltopic]
if !ok {
klog.V(2).Infof("%s topic 404, not realtime.", ltopic)
} else {
Expand All @@ -250,7 +257,7 @@ func (r *realtimeCalculator) calculate(reloading []string, currentRealtime []str

info, hit := r.fetchRealtimeCache(topic)
if !hit { // fetch again, cache miss
info, err = r.fetchRealtimeInfo(topic, loaderTopic, group)
info, err = r.fetchRealtimeInfo(topic, loaderTopic, desiredGroupID, loaderCurrentOffset)
if err != nil {
klog.Errorf(
"rsk/%s Error fetching realtime info for topic: %s, err: %v",
Expand Down Expand Up @@ -308,6 +315,12 @@ func (r *realtimeCalculator) calculate(reloading []string, currentRealtime []str
info.loaderRealtime = true
r.loadersRealtime = append(r.loadersRealtime, ltopic)
}
// this is for updating the LoaderCurrentOffset
updateTopicGroup(r.rsk, topic, tipocav1.Group{
LoaderCurrentOffset: info.loader.current,
LoaderTopicPrefix: loaderPrefixFromGroupID(r.rsk.Spec.KafkaLoaderTopicPrefix, currentGroupID),
ID: currentGroupID,
})
}
r.loadersLast = append(
r.loadersLast,
Expand Down
3 changes: 1 addition & 2 deletions controllers/redshiftsink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,7 @@ func (r *RedshiftSinkReconciler) reconcile(
// Realtime status is always calculated to keep the CurrentOffset
// info updated in the rsk status. This is required so that low throughput
// release do not get blocked due to missing consumer group currentOffset.
reloadTopicGroup := topicGroupBySinkGroup(rsk, ReloadSinkGroup, status.reloading, status.desiredVersion, rsk.Spec.KafkaLoaderTopicPrefix)
calc := newRealtimeCalculator(rsk, kafkaWatcher, reloadTopicGroup, r.KafkaRealtimeCache)
calc := newRealtimeCalculator(rsk, kafkaWatcher, r.KafkaRealtimeCache, desiredMaskVersion)
currentRealtime := calc.calculate(status.reloading, status.realtime)
if len(status.reloading) > 0 {
klog.V(2).Infof("rsk/%v batchersRealtime: %d / %d (current=%d)", rsk.Name, len(calc.batchersRealtime), len(status.reloading), len(rsk.Status.BatcherReloadingTopics))
Expand Down

0 comments on commit bf31952

Please sign in to comment.