Skip to content

Commit

Permalink
fix: record offset for src topic instead of target (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbhor authored Dec 4, 2024
1 parent 4273fc4 commit 9ef82a8
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
5 changes: 4 additions & 1 deletion internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ loop:
// Always record the latest offsets before the messages are processed for new connections and
// retries to consume from where it was left off.
// TODO: What if the next step fails? The messages won't be read again?
re.source.RecordOffsets(rec)
if err := re.source.RecordOffsets(rec); err != nil {
re.log.Error("error recording offset", "err", err)
return err
}

if err := re.processMessage(ctx, rec); err != nil {
re.log.Error("error processing message", "err", err)
Expand Down
19 changes: 15 additions & 4 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type SourcePool struct {
log *slog.Logger
metrics *metrics.Set
targetToSrc map[string]string
srcToTarget map[string]string
srcTopics []string

// targetOffsets is initialized with current topic high watermarks from target.
Expand Down Expand Up @@ -103,17 +104,20 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerCfg, topics Topics, t
}

var (
srcToTarg = make(map[string]string, len(topics))
targToSrc = make(map[string]string, len(topics))
srcTopics = make([]string, 0, len(topics))
)
for src, targ := range topics {
srcTopics = append(srcTopics, src)
targToSrc[targ.TargetTopic] = src
srcToTarg[src] = targ.TargetTopic
}

sp := &SourcePool{
cfg: cfg,
targetToSrc: targToSrc,
srcToTarget: srcToTarg,
srcTopics: srcTopics,
servers: servers,
log: log,
Expand Down Expand Up @@ -223,21 +227,28 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {

// RecordOffsets records the offsets of the latest fetched records per topic.
// This is used to resume consumption on new connections/reconnections from the source during runtime.
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) {
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) error {
if sp.targetOffsets == nil {
sp.targetOffsets = make(TopicOffsets)
}

if o, ok := sp.targetOffsets[rec.Topic]; ok {
topic, ok := sp.srcToTarget[rec.Topic]
if !ok {
return fmt.Errorf("target topic not found for src topic %s", rec.Topic)
}

if o, ok := sp.targetOffsets[topic]; ok {
// If the topic already exists, update the offset for the partition.
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
sp.targetOffsets[rec.Topic] = o
sp.targetOffsets[topic] = o
} else {
// If the topic does not exist, create a new map for the topic.
o := make(map[int32]kgo.Offset)
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
sp.targetOffsets[rec.Topic] = o
sp.targetOffsets[topic] = o
}

return nil
}

func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error) {
Expand Down

0 comments on commit 9ef82a8

Please sign in to comment.