Skip to content

Commit

Permalink
Merge pull request #110 from stephane-moreau/partition-resub
Browse files Browse the repository at this point in the history
Do not loose partition subscription while reconnecting
  • Loading branch information
tylertreat authored Mar 26, 2021
2 parents 0cdd557 + e1a043c commit feeaeae
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ func (c *client) Subscribe(ctx context.Context, streamName string, handler Handl
return err
}

go c.dispatchStream(ctx, streamName, stream, handler)
go c.dispatchStream(ctx, streamName, stream, *opts, handler)
return nil
}

Expand Down Expand Up @@ -1588,9 +1588,9 @@ func (c *client) subscribe(ctx context.Context, stream string,
}

func (c *client) dispatchStream(ctx context.Context, streamName string,
stream proto.API_SubscribeClient, handler Handler) {
stream proto.API_SubscribeClient, initOpts SubscriptionOptions, handler Handler) {
var (
lastOffset int64
lastOffset int64 = -1
lastError error
resubscribe bool
closed bool
Expand Down Expand Up @@ -1643,9 +1643,19 @@ LOOP:
// some time for the leader to failover.
if resubscribe {
deadline := time.Now().Add(c.opts.ResubscribeWaitTime)

newOpts := initOpts
if lastOffset != -1 {
// At least one message received we need to restart fetching messages after this one
// else we keep the initial start position, Must keep option to keep track of other options
// partitions / end pointer of subscriptions....
// intentionnaly ignoring error
_ = StartAtOffset(lastOffset + 1)(&newOpts)
}
for time.Now().Before(deadline) && !closed {
err := c.Subscribe(ctx, streamName, handler, StartAtOffset(lastOffset+1))
newStream, err := c.subscribe(ctx, streamName, &newOpts)
if err == nil {
go c.dispatchStream(ctx, streamName, newStream, newOpts, handler)
return
}
sleepContext(ctx, time.Second+(time.Duration(rand.Intn(500))*time.Millisecond))
Expand Down

0 comments on commit feeaeae

Please sign in to comment.