From d5965fac1f55fe2ff1520c493360a873effa0af7 Mon Sep 17 00:00:00 2001 From: Douglas Thrift Date: Tue, 1 Apr 2025 16:55:54 -0700 Subject: [PATCH 1/2] Fix streaming Reader race condition - There was a race condition between creating a `streaming.Reader` with `NewReader` and calling its `Subscribe` where events could be consumed before there is any channel to send them to. - To fix this, the default behavior is now to only start the reader's `read` goroutine when `Subscribe` is called after it has inserted a channel that will receive events. - Alternatively, there is now an option `WithReaderStartExplicitly` which will only start the `read` goroutine when the new `Start` is called allowing multiple calls to `Subscribe` before receiving events. --- streaming/options/reader.go | 21 +++++++++++----- streaming/reader.go | 50 ++++++++++++++++++++++++++----------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/streaming/options/reader.go b/streaming/options/reader.go index 4aca96f..ef0ffb6 100644 --- a/streaming/options/reader.go +++ b/streaming/options/reader.go @@ -10,12 +10,13 @@ type ( Reader func(*ReaderOptions) ReaderOptions struct { - BlockDuration time.Duration - MaxPolled int64 - Topic string - TopicPattern string - BufferSize int - LastEventID string + BlockDuration time.Duration + MaxPolled int64 + Topic string + TopicPattern string + BufferSize int + LastEventID string + StartExplicitly bool } ) @@ -95,6 +96,14 @@ func WithReaderStartAt(startAt time.Time) Reader { } } +// WithReaderStartExplicitly sets the reader to start explicitly when Start is called. +// By default the reader starts after the first call to Subscribe. +func WithReaderStartExplicitly() Reader { + return func(o *ReaderOptions) { + o.StartExplicitly = true + } +} + // ParseReaderOptions parses the given options and returns the corresponding // reader options. func ParseReaderOptions(opts ...Reader) ReaderOptions { diff --git a/streaming/reader.go b/streaming/reader.go index d60c6a4..352a86b 100644 --- a/streaming/reader.go +++ b/streaming/reader.go @@ -42,6 +42,11 @@ type ( bufferSize int // channels to send notifications chans []chan *Event + // startOnce is used to ensure the reader is started only once. + startOnce sync.Once + // startExplicitly indicates the the reader should only start when Start is called, otherwise it will start after + // the first call to Subscribe. + startExplicitly bool // donechan is the reader donechan channel. donechan chan struct{} // streamschan notifies the reader when streams are added or @@ -100,26 +105,30 @@ func newReader(stream *Stream, opts ...options.Reader) (*Reader, error) { } reader := &Reader{ - startID: o.LastEventID, - streams: []*Stream{stream}, - streamKeys: []string{stream.key}, - streamCursors: []string{o.LastEventID}, - blockDuration: o.BlockDuration, - maxPolled: o.MaxPolled, - bufferSize: o.BufferSize, - donechan: make(chan struct{}), - streamschan: make(chan struct{}), - eventFilter: eventFilter, - logger: stream.rootLogger.WithPrefix("reader", stream.Name), - rdb: stream.rdb, + startID: o.LastEventID, + streams: []*Stream{stream}, + streamKeys: []string{stream.key}, + streamCursors: []string{o.LastEventID}, + blockDuration: o.BlockDuration, + maxPolled: o.MaxPolled, + bufferSize: o.BufferSize, + startExplicitly: o.StartExplicitly, + donechan: make(chan struct{}), + streamschan: make(chan struct{}), + eventFilter: eventFilter, + logger: stream.rootLogger.WithPrefix("reader", stream.Name), + rdb: stream.rdb, } - reader.wait.Add(1) - pulse.Go(reader.logger, reader.read) - return reader, nil } +// Start starts the reader. This is only needed if the reader is created with +// WithReaderStartExplicitly. It is idempotent. +func (r *Reader) Start() { + r.start() +} + // Subscribe returns a channel that receives events from the stream. // The channel is closed when the reader is closed. func (r *Reader) Subscribe() <-chan *Event { @@ -127,6 +136,9 @@ func (r *Reader) Subscribe() <-chan *Event { r.lock.Lock() defer r.lock.Unlock() r.chans = append(r.chans, c) + if !r.startExplicitly { + r.start() + } return c } @@ -209,6 +221,14 @@ func (r *Reader) IsClosed() bool { return r.closed } +// start starts the reader's read goroutine if it is not already running. +func (r *Reader) start() { + r.startOnce.Do(func() { + r.wait.Add(1) + pulse.Go(r.logger, r.read) + }) +} + // read reads events from the streams and sends them to the reader channel. func (r *Reader) read() { ctx := context.Background() From d94dff5cac431a9e3231086bb330d589172a4ba1 Mon Sep 17 00:00:00 2001 From: Douglas Thrift Date: Fri, 11 Apr 2025 14:49:00 -0700 Subject: [PATCH 2/2] Remove option to start explicitly --- streaming/options/reader.go | 21 ++++++-------------- streaming/reader.go | 38 +++++++++++++------------------------ 2 files changed, 19 insertions(+), 40 deletions(-) diff --git a/streaming/options/reader.go b/streaming/options/reader.go index ef0ffb6..4aca96f 100644 --- a/streaming/options/reader.go +++ b/streaming/options/reader.go @@ -10,13 +10,12 @@ type ( Reader func(*ReaderOptions) ReaderOptions struct { - BlockDuration time.Duration - MaxPolled int64 - Topic string - TopicPattern string - BufferSize int - LastEventID string - StartExplicitly bool + BlockDuration time.Duration + MaxPolled int64 + Topic string + TopicPattern string + BufferSize int + LastEventID string } ) @@ -96,14 +95,6 @@ func WithReaderStartAt(startAt time.Time) Reader { } } -// WithReaderStartExplicitly sets the reader to start explicitly when Start is called. -// By default the reader starts after the first call to Subscribe. -func WithReaderStartExplicitly() Reader { - return func(o *ReaderOptions) { - o.StartExplicitly = true - } -} - // ParseReaderOptions parses the given options and returns the corresponding // reader options. func ParseReaderOptions(opts ...Reader) ReaderOptions { diff --git a/streaming/reader.go b/streaming/reader.go index 352a86b..3cb5fae 100644 --- a/streaming/reader.go +++ b/streaming/reader.go @@ -44,9 +44,6 @@ type ( chans []chan *Event // startOnce is used to ensure the reader is started only once. startOnce sync.Once - // startExplicitly indicates the the reader should only start when Start is called, otherwise it will start after - // the first call to Subscribe. - startExplicitly bool // donechan is the reader donechan channel. donechan chan struct{} // streamschan notifies the reader when streams are added or @@ -105,30 +102,23 @@ func newReader(stream *Stream, opts ...options.Reader) (*Reader, error) { } reader := &Reader{ - startID: o.LastEventID, - streams: []*Stream{stream}, - streamKeys: []string{stream.key}, - streamCursors: []string{o.LastEventID}, - blockDuration: o.BlockDuration, - maxPolled: o.MaxPolled, - bufferSize: o.BufferSize, - startExplicitly: o.StartExplicitly, - donechan: make(chan struct{}), - streamschan: make(chan struct{}), - eventFilter: eventFilter, - logger: stream.rootLogger.WithPrefix("reader", stream.Name), - rdb: stream.rdb, + startID: o.LastEventID, + streams: []*Stream{stream}, + streamKeys: []string{stream.key}, + streamCursors: []string{o.LastEventID}, + blockDuration: o.BlockDuration, + maxPolled: o.MaxPolled, + bufferSize: o.BufferSize, + donechan: make(chan struct{}), + streamschan: make(chan struct{}), + eventFilter: eventFilter, + logger: stream.rootLogger.WithPrefix("reader", stream.Name), + rdb: stream.rdb, } return reader, nil } -// Start starts the reader. This is only needed if the reader is created with -// WithReaderStartExplicitly. It is idempotent. -func (r *Reader) Start() { - r.start() -} - // Subscribe returns a channel that receives events from the stream. // The channel is closed when the reader is closed. func (r *Reader) Subscribe() <-chan *Event { @@ -136,9 +126,7 @@ func (r *Reader) Subscribe() <-chan *Event { r.lock.Lock() defer r.lock.Unlock() r.chans = append(r.chans, c) - if !r.startExplicitly { - r.start() - } + r.start() return c }