Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSGO-41 Fix deadlock in refresh debouncer stop #1767

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,25 +747,27 @@ const (
// debounces requests to call a refresh function (currently used for ring refresh). It also supports triggering a refresh immediately.
type refreshDebouncer struct {
mu sync.Mutex
stopped bool
broadcaster *errorBroadcaster
interval time.Duration
timer *time.Timer
refreshNowCh chan struct{}
quit chan struct{}
refreshFn func() error
quitCtxFn context.CancelFunc
quitCtx context.Context
stoppedCtxFn context.CancelFunc
stoppedCtx context.Context
}

func newRefreshDebouncer(interval time.Duration, refreshFn func() error) *refreshDebouncer {
d := &refreshDebouncer{
stopped: false,
broadcaster: nil,
refreshNowCh: make(chan struct{}, 1),
quit: make(chan struct{}),
interval: interval,
timer: time.NewTimer(interval),
refreshFn: refreshFn,
}
d.stoppedCtx, d.stoppedCtxFn = context.WithCancel(context.Background())
d.quitCtx, d.quitCtxFn = context.WithCancel(context.Background())
d.timer.Stop()
go d.flusher()
return d
Expand All @@ -775,7 +777,7 @@ func newRefreshDebouncer(interval time.Duration, refreshFn func() error) *refres
func (d *refreshDebouncer) debounce() {
d.mu.Lock()
defer d.mu.Unlock()
if d.stopped {
if d.isStopped() {
return
}
d.timer.Reset(d.interval)
Expand All @@ -786,6 +788,11 @@ func (d *refreshDebouncer) refreshNow() <-chan error {
d.mu.Lock()
defer d.mu.Unlock()
if d.broadcaster == nil {
if d.isStopped() {
ch := make(chan error)
close(ch)
return ch
}
d.broadcaster = newErrorBroadcaster()
select {
case d.refreshNowCh <- struct{}{}:
Expand All @@ -797,14 +804,15 @@ func (d *refreshDebouncer) refreshNow() <-chan error {
}

func (d *refreshDebouncer) flusher() {
defer d.stoppedCtxFn()
for {
select {
case <-d.refreshNowCh:
case <-d.timer.C:
case <-d.quit:
case <-d.quitCtx.Done():
}
d.mu.Lock()
if d.stopped {
if d.isStopped() {
if d.broadcaster != nil {
d.broadcaster.stop()
d.broadcaster = nil
Expand Down Expand Up @@ -838,15 +846,12 @@ func (d *refreshDebouncer) flusher() {
}

func (d *refreshDebouncer) stop() {
d.mu.Lock()
if d.stopped {
d.mu.Unlock()
return
}
d.stopped = true
d.mu.Unlock()
d.quit <- struct{}{} // sync with flusher
close(d.quit)
d.quitCtxFn() // wake up flusher
<-d.stoppedCtx.Done() // wait for flusher to exit
}

func (d *refreshDebouncer) isStopped() bool {
return d.quitCtx.Err() != nil
}

// broadcasts an error to multiple channels (listeners)
Expand Down
26 changes: 26 additions & 0 deletions host_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,32 @@ func TestRefreshDebouncer_EventsAfterRefreshNow(t *testing.T) {
}
}

// https://github.com/gocql/gocql/issues/1752
func TestRefreshDebouncer_DeadlockOnStop(t *testing.T) {
// there's no way to guarantee this bug manifests because it depends on which `case` is picked from the `select`
// with 4 iterations of this test the deadlock would be hit pretty consistently

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this comment indicate that you were reliably reproducing it without the fix in place or is it just speculating that 4 should be enough to hit it?

Copy link
Contributor Author

@joao-r-reis joao-r-reis Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was reliably reproducing it with 4 without the fix in place

const iterations = 4
for i := 0; i < iterations; i++ {
refreshCalledCh := make(chan int, 5)
refreshDuration := 500 * time.Millisecond
fn := func() error {
refreshCalledCh <- 0
time.Sleep(refreshDuration)
return nil
}
d := newRefreshDebouncer(50*time.Millisecond, fn)
timeBeforeRefresh := time.Now()
_ = d.refreshNow()
<-refreshCalledCh
d.debounce()
d.stop()
timeAfterRefresh := time.Now()
if timeAfterRefresh.Sub(timeBeforeRefresh) < refreshDuration {
t.Errorf("refresh debouncer stop() didn't wait until flusher stopped")
}
}
}

func TestErrorBroadcaster_MultipleListeners(t *testing.T) {
b := newErrorBroadcaster()
defer b.stop()
Expand Down
Loading