Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix streaming Reader race condition #55

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions streaming/options/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ type (
Reader func(*ReaderOptions)

ReaderOptions struct {
BlockDuration time.Duration
MaxPolled int64
Topic string
TopicPattern string
BufferSize int
LastEventID string
BlockDuration time.Duration
MaxPolled int64
Topic string
TopicPattern string
BufferSize int
LastEventID string
StartExplicitly bool
}
)

Expand Down Expand Up @@ -95,6 +96,14 @@ func WithReaderStartAt(startAt time.Time) Reader {
}
}

// WithReaderStartExplicitly sets the reader to start explicitly when Start is called.
// By default the reader starts after the first call to Subscribe.
func WithReaderStartExplicitly() Reader {
return func(o *ReaderOptions) {
o.StartExplicitly = true
}
}

// ParseReaderOptions parses the given options and returns the corresponding
// reader options.
func ParseReaderOptions(opts ...Reader) ReaderOptions {
Expand Down
50 changes: 35 additions & 15 deletions streaming/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type (
bufferSize int
// channels to send notifications
chans []chan *Event
// startOnce is used to ensure the reader is started only once.
startOnce sync.Once
// startExplicitly indicates the the reader should only start when Start is called, otherwise it will start after
// the first call to Subscribe.
startExplicitly bool
// donechan is the reader donechan channel.
donechan chan struct{}
// streamschan notifies the reader when streams are added or
Expand Down Expand Up @@ -100,33 +105,40 @@ func newReader(stream *Stream, opts ...options.Reader) (*Reader, error) {
}

reader := &Reader{
startID: o.LastEventID,
streams: []*Stream{stream},
streamKeys: []string{stream.key},
streamCursors: []string{o.LastEventID},
blockDuration: o.BlockDuration,
maxPolled: o.MaxPolled,
bufferSize: o.BufferSize,
donechan: make(chan struct{}),
streamschan: make(chan struct{}),
eventFilter: eventFilter,
logger: stream.rootLogger.WithPrefix("reader", stream.Name),
rdb: stream.rdb,
startID: o.LastEventID,
streams: []*Stream{stream},
streamKeys: []string{stream.key},
streamCursors: []string{o.LastEventID},
blockDuration: o.BlockDuration,
maxPolled: o.MaxPolled,
bufferSize: o.BufferSize,
startExplicitly: o.StartExplicitly,
donechan: make(chan struct{}),
streamschan: make(chan struct{}),
eventFilter: eventFilter,
logger: stream.rootLogger.WithPrefix("reader", stream.Name),
rdb: stream.rdb,
}

reader.wait.Add(1)
pulse.Go(reader.logger, reader.read)

return reader, nil
}

// Start starts the reader. This is only needed if the reader is created with
// WithReaderStartExplicitly. It is idempotent.
func (r *Reader) Start() {
r.start()
}

// Subscribe returns a channel that receives events from the stream.
// The channel is closed when the reader is closed.
func (r *Reader) Subscribe() <-chan *Event {
c := make(chan *Event, r.bufferSize)
r.lock.Lock()
defer r.lock.Unlock()
r.chans = append(r.chans, c)
if !r.startExplicitly {
r.start()
}
return c
}

Expand Down Expand Up @@ -209,6 +221,14 @@ func (r *Reader) IsClosed() bool {
return r.closed
}

// start starts the reader's read goroutine if it is not already running.
func (r *Reader) start() {
r.startOnce.Do(func() {
r.wait.Add(1)
pulse.Go(r.logger, r.read)
})
}

// read reads events from the streams and sends them to the reader channel.
func (r *Reader) read() {
ctx := context.Background()
Expand Down