From 2ebb9987d64780246249842737537d8b763d5785 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 31 Dec 2024 10:49:29 +0100 Subject: [PATCH] Move refreshDebouncer to debounce package --- debounce/refresh_deboucer.go | 164 +++++++++++++++++++ debounce/refresh_debouncer_test.go | 251 +++++++++++++++++++++++++++++ host_source.go | 162 +------------------ host_source_test.go | 246 ---------------------------- session.go | 7 +- 5 files changed, 421 insertions(+), 409 deletions(-) create mode 100644 debounce/refresh_deboucer.go create mode 100644 debounce/refresh_debouncer_test.go diff --git a/debounce/refresh_deboucer.go b/debounce/refresh_deboucer.go new file mode 100644 index 000000000..7a3bd3f7d --- /dev/null +++ b/debounce/refresh_deboucer.go @@ -0,0 +1,164 @@ +package debounce + +import ( + "sync" + "time" +) + +const ( + RingRefreshDebounceTime = 1 * time.Second +) + +// debounces requests to call a refresh function (currently used for ring refresh). It also supports triggering a refresh immediately. +type RefreshDebouncer struct { + mu sync.Mutex + stopped bool + broadcaster *errorBroadcaster + interval time.Duration + timer *time.Timer + refreshNowCh chan struct{} + quit chan struct{} + refreshFn func() error +} + +func NewRefreshDebouncer(interval time.Duration, refreshFn func() error) *RefreshDebouncer { + d := &RefreshDebouncer{ + stopped: false, + broadcaster: nil, + refreshNowCh: make(chan struct{}, 1), + quit: make(chan struct{}), + interval: interval, + timer: time.NewTimer(interval), + refreshFn: refreshFn, + } + d.timer.Stop() + go d.flusher() + return d +} + +// debounces a request to call the refresh function +func (d *RefreshDebouncer) Debounce() { + d.mu.Lock() + defer d.mu.Unlock() + if d.stopped { + return + } + d.timer.Reset(d.interval) +} + +// requests an immediate refresh which will cancel pending refresh requests +func (d *RefreshDebouncer) RefreshNow() <-chan error { + d.mu.Lock() + defer d.mu.Unlock() + if d.broadcaster == nil { + d.broadcaster = newErrorBroadcaster() + select { + case d.refreshNowCh <- struct{}{}: + default: + // already a refresh pending + } + } + return d.broadcaster.newListener() +} + +func (d *RefreshDebouncer) flusher() { + for { + select { + case <-d.refreshNowCh: + case <-d.timer.C: + case <-d.quit: + } + d.mu.Lock() + if d.stopped { + if d.broadcaster != nil { + d.broadcaster.stop() + d.broadcaster = nil + } + d.timer.Stop() + d.mu.Unlock() + return + } + + // make sure both request channels are cleared before we refresh + select { + case <-d.refreshNowCh: + default: + } + + d.timer.Stop() + select { + case <-d.timer.C: + default: + } + + curBroadcaster := d.broadcaster + d.broadcaster = nil + d.mu.Unlock() + + err := d.refreshFn() + if curBroadcaster != nil { + curBroadcaster.broadcast(err) + } + } +} + +func (d *RefreshDebouncer) Stop() { + d.mu.Lock() + if d.stopped { + d.mu.Unlock() + return + } + d.stopped = true + d.mu.Unlock() + d.quit <- struct{}{} // sync with flusher + close(d.quit) +} + +// broadcasts an error to multiple channels (listeners) +type errorBroadcaster struct { + listeners []chan<- error + mu sync.Mutex +} + +func newErrorBroadcaster() *errorBroadcaster { + return &errorBroadcaster{ + listeners: nil, + mu: sync.Mutex{}, + } +} + +func (b *errorBroadcaster) newListener() <-chan error { + ch := make(chan error, 1) + b.mu.Lock() + defer b.mu.Unlock() + b.listeners = append(b.listeners, ch) + return ch +} + +func (b *errorBroadcaster) broadcast(err error) { + b.mu.Lock() + defer b.mu.Unlock() + curListeners := b.listeners + if len(curListeners) > 0 { + b.listeners = nil + } else { + return + } + + for _, listener := range curListeners { + listener <- err + close(listener) + } +} + +func (b *errorBroadcaster) stop() { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.listeners) == 0 { + return + } + for _, listener := range b.listeners { + close(listener) + } + b.listeners = nil +} diff --git a/debounce/refresh_debouncer_test.go b/debounce/refresh_debouncer_test.go new file mode 100644 index 000000000..43c7b4c18 --- /dev/null +++ b/debounce/refresh_debouncer_test.go @@ -0,0 +1,251 @@ +package debounce + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + "time" +) + +// This test sends debounce requests and waits until the refresh function is called (which should happen when the timer elapses). +func TestRefreshDebouncer_MultipleEvents(t *testing.T) { + const numberOfEvents = 10 + channel := make(chan int, numberOfEvents) // should never use more than 1 but allow for more to possibly detect bugs + fn := func() error { + channel <- 0 + return nil + } + beforeEvents := time.Now() + wg := sync.WaitGroup{} + d := NewRefreshDebouncer(2*time.Second, fn) + defer d.Stop() + for i := 0; i < numberOfEvents; i++ { + wg.Add(1) + go func() { + defer wg.Done() + d.Debounce() + }() + } + wg.Wait() + timeoutCh := time.After(2500 * time.Millisecond) // extra time to avoid flakiness + select { + case <-channel: + case <-timeoutCh: + t.Fatalf("timeout elapsed without flush function being called") + } + afterFunctionCall := time.Now() + + // use 1.5 seconds instead of 2 seconds to avoid timer precision issues + if afterFunctionCall.Sub(beforeEvents) < 1500*time.Millisecond { + t.Fatalf("function was called after %v ms instead of ~2 seconds", afterFunctionCall.Sub(beforeEvents).Milliseconds()) + } + + // wait another 2 seconds and check if function was called again + time.Sleep(2500 * time.Millisecond) + if len(channel) > 0 { + t.Fatalf("function was called more than once") + } +} + +// This test: +// +// 1 - Sends debounce requests when test starts +// 2 - Calls refreshNow() before the timer elapsed (which stops the timer) about 1.5 seconds after test starts +// +// The end result should be 1 refresh function call when refreshNow() is called. +func TestRefreshDebouncer_RefreshNow(t *testing.T) { + const numberOfEvents = 10 + channel := make(chan int, numberOfEvents) // should never use more than 1 but allow for more to possibly detect bugs + fn := func() error { + channel <- 0 + return nil + } + beforeEvents := time.Now() + eventsWg := sync.WaitGroup{} + d := NewRefreshDebouncer(2*time.Second, fn) + defer d.Stop() + for i := 0; i < numberOfEvents; i++ { + eventsWg.Add(1) + go func() { + defer eventsWg.Done() + d.Debounce() + }() + } + + refreshNowWg := sync.WaitGroup{} + refreshNowWg.Add(1) + go func() { + defer refreshNowWg.Done() + time.Sleep(1500 * time.Millisecond) + d.RefreshNow() + }() + + eventsWg.Wait() + select { + case <-channel: + t.Fatalf("function was called before the expected time") + default: + } + + refreshNowWg.Wait() + + timeoutCh := time.After(200 * time.Millisecond) // allow for 200ms of delay to prevent flakiness + select { + case <-channel: + case <-timeoutCh: + t.Fatalf("timeout elapsed without flush function being called") + } + afterFunctionCall := time.Now() + + // use 1 second instead of 1.5s to avoid timer precision issues + if afterFunctionCall.Sub(beforeEvents) < 1000*time.Millisecond { + t.Fatalf("function was called after %v ms instead of ~1.5 seconds", afterFunctionCall.Sub(beforeEvents).Milliseconds()) + } + + // wait some time and check if function was called again + time.Sleep(2500 * time.Millisecond) + if len(channel) > 0 { + t.Fatalf("function was called more than once") + } +} + +// This test: +// +// 1 - Sends debounce requests when test starts +// 2 - Calls refreshNow() before the timer elapsed (which stops the timer) about 1 second after test starts +// 3 - Sends more debounce requests (which resets the timer with a 3-second interval) about 2 seconds after test starts +// +// The end result should be 2 refresh function calls: +// +// 1 - When refreshNow() is called (1 second after the test starts) +// 2 - When the timer elapses after the second "wave" of debounce requests (5 seconds after the test starts) +func TestRefreshDebouncer_EventsAfterRefreshNow(t *testing.T) { + const numberOfEvents = 10 + channel := make(chan int, numberOfEvents) // should never use more than 2 but allow for more to possibly detect bugs + fn := func() error { + channel <- 0 + return nil + } + beforeEvents := time.Now() + wg := sync.WaitGroup{} + d := NewRefreshDebouncer(3*time.Second, fn) + defer d.Stop() + for i := 0; i < numberOfEvents; i++ { + wg.Add(1) + go func() { + defer wg.Done() + d.Debounce() + time.Sleep(2000 * time.Millisecond) + d.Debounce() + }() + } + + go func() { + time.Sleep(1 * time.Second) + d.RefreshNow() + }() + + wg.Wait() + timeoutCh := time.After(1500 * time.Millisecond) // extra 500ms to prevent flakiness + select { + case <-channel: + case <-timeoutCh: + t.Fatalf("timeout elapsed without flush function being called after refreshNow()") + } + afterFunctionCall := time.Now() + + // use 500ms instead of 1s to avoid timer precision issues + if afterFunctionCall.Sub(beforeEvents) < 500*time.Millisecond { + t.Fatalf("function was called after %v ms instead of ~1 second", afterFunctionCall.Sub(beforeEvents).Milliseconds()) + } + + timeoutCh = time.After(4 * time.Second) // extra 1s to prevent flakiness + select { + case <-channel: + case <-timeoutCh: + t.Fatalf("timeout elapsed without flush function being called after debounce requests") + } + afterSecondFunctionCall := time.Now() + + // use 2.5s instead of 3s to avoid timer precision issues + if afterSecondFunctionCall.Sub(afterFunctionCall) < 2500*time.Millisecond { + t.Fatalf("function was called after %v ms instead of ~3 seconds", afterSecondFunctionCall.Sub(afterFunctionCall).Milliseconds()) + } + + if len(channel) > 0 { + t.Fatalf("function was called more than twice") + } +} + +func TestErrorBroadcaster_MultipleListeners(t *testing.T) { + b := newErrorBroadcaster() + defer b.stop() + const numberOfListeners = 10 + var listeners []<-chan error + for i := 0; i < numberOfListeners; i++ { + listeners = append(listeners, b.newListener()) + } + + err := errors.New("expected error") + wg := sync.WaitGroup{} + result := atomic.Value{} + for _, listener := range listeners { + currentListener := listener + wg.Add(1) + go func() { + defer wg.Done() + receivedErr, ok := <-currentListener + if !ok { + result.Store(errors.New("listener was closed")) + } else if receivedErr != err { + result.Store(errors.New("expected received error to be the same as the one that was broadcasted")) + } + }() + } + wg.Add(1) + go func() { + defer wg.Done() + b.broadcast(err) + b.stop() + }() + wg.Wait() + if loadedVal := result.Load(); loadedVal != nil { + t.Errorf(loadedVal.(error).Error()) + } +} + +func TestErrorBroadcaster_StopWithoutBroadcast(t *testing.T) { + var b = newErrorBroadcaster() + defer b.stop() + const numberOfListeners = 10 + var listeners []<-chan error + for i := 0; i < numberOfListeners; i++ { + listeners = append(listeners, b.newListener()) + } + + wg := sync.WaitGroup{} + result := atomic.Value{} + for _, listener := range listeners { + currentListener := listener + wg.Add(1) + go func() { + defer wg.Done() + // broadcaster stopped, expect listener to be closed + _, ok := <-currentListener + if ok { + result.Store(errors.New("expected listener to be closed")) + } + }() + } + wg.Add(1) + go func() { + defer wg.Done() + // call stop without broadcasting anything to current listeners + b.stop() + }() + wg.Wait() + if loadedVal := result.Load(); loadedVal != nil { + t.Errorf(loadedVal.(error).Error()) + } +} diff --git a/host_source.go b/host_source.go index 8b340f8c6..1d4dfd1c1 100644 --- a/host_source.go +++ b/host_source.go @@ -661,12 +661,12 @@ func hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int, transl // debounceRingRefresh submits a ring refresh request to the ring refresh debouncer. func (s *Session) debounceRingRefresh() { - s.ringRefresher.debounce() + s.ringRefresher.Debounce() } // refreshRing executes a ring refresh immediately and cancels pending debounce ring refresh requests. func (s *Session) refreshRingNow() error { - err, ok := <-s.ringRefresher.refreshNow() + err, ok := <-s.ringRefresher.RefreshNow() if !ok { return errors.New("could not refresh ring because stop was requested") } @@ -720,161 +720,3 @@ func (s *Session) refreshRing() error { return nil } - -const ( - ringRefreshDebounceTime = 1 * time.Second -) - -// debounces requests to call a refresh function (currently used for ring refresh). It also supports triggering a refresh immediately. -type refreshDebouncer struct { - mu sync.Mutex - stopped bool - broadcaster *errorBroadcaster - interval time.Duration - timer *time.Timer - refreshNowCh chan struct{} - quit chan struct{} - refreshFn func() error -} - -func newRefreshDebouncer(interval time.Duration, refreshFn func() error) *refreshDebouncer { - d := &refreshDebouncer{ - stopped: false, - broadcaster: nil, - refreshNowCh: make(chan struct{}, 1), - quit: make(chan struct{}), - interval: interval, - timer: time.NewTimer(interval), - refreshFn: refreshFn, - } - d.timer.Stop() - go d.flusher() - return d -} - -// debounces a request to call the refresh function -func (d *refreshDebouncer) debounce() { - d.mu.Lock() - defer d.mu.Unlock() - if d.stopped { - return - } - d.timer.Reset(d.interval) -} - -// requests an immediate refresh which will cancel pending refresh requests -func (d *refreshDebouncer) refreshNow() <-chan error { - d.mu.Lock() - defer d.mu.Unlock() - if d.broadcaster == nil { - d.broadcaster = newErrorBroadcaster() - select { - case d.refreshNowCh <- struct{}{}: - default: - // already a refresh pending - } - } - return d.broadcaster.newListener() -} - -func (d *refreshDebouncer) flusher() { - for { - select { - case <-d.refreshNowCh: - case <-d.timer.C: - case <-d.quit: - } - d.mu.Lock() - if d.stopped { - if d.broadcaster != nil { - d.broadcaster.stop() - d.broadcaster = nil - } - d.timer.Stop() - d.mu.Unlock() - return - } - - // make sure both request channels are cleared before we refresh - select { - case <-d.refreshNowCh: - default: - } - - d.timer.Stop() - select { - case <-d.timer.C: - default: - } - - curBroadcaster := d.broadcaster - d.broadcaster = nil - d.mu.Unlock() - - err := d.refreshFn() - if curBroadcaster != nil { - curBroadcaster.broadcast(err) - } - } -} - -func (d *refreshDebouncer) stop() { - d.mu.Lock() - if d.stopped { - d.mu.Unlock() - return - } - d.stopped = true - d.mu.Unlock() - d.quit <- struct{}{} // sync with flusher - close(d.quit) -} - -// broadcasts an error to multiple channels (listeners) -type errorBroadcaster struct { - listeners []chan<- error - mu sync.Mutex -} - -func newErrorBroadcaster() *errorBroadcaster { - return &errorBroadcaster{ - listeners: nil, - mu: sync.Mutex{}, - } -} - -func (b *errorBroadcaster) newListener() <-chan error { - ch := make(chan error, 1) - b.mu.Lock() - defer b.mu.Unlock() - b.listeners = append(b.listeners, ch) - return ch -} - -func (b *errorBroadcaster) broadcast(err error) { - b.mu.Lock() - defer b.mu.Unlock() - curListeners := b.listeners - if len(curListeners) > 0 { - b.listeners = nil - } else { - return - } - - for _, listener := range curListeners { - listener <- err - close(listener) - } -} - -func (b *errorBroadcaster) stop() { - b.mu.Lock() - defer b.mu.Unlock() - if len(b.listeners) == 0 { - return - } - for _, listener := range b.listeners { - close(listener) - } - b.listeners = nil -} diff --git a/host_source_test.go b/host_source_test.go index a99c9549b..49473b62c 100644 --- a/host_source_test.go +++ b/host_source_test.go @@ -4,12 +4,8 @@ package gocql import ( - "errors" "net" - "sync" - "sync/atomic" "testing" - "time" ) func TestUnmarshalCassVersion(t *testing.T) { @@ -122,245 +118,3 @@ func TestHostInfo_ConnectAddress(t *testing.T) { }) } } - -// This test sends debounce requests and waits until the refresh function is called (which should happen when the timer elapses). -func TestRefreshDebouncer_MultipleEvents(t *testing.T) { - const numberOfEvents = 10 - channel := make(chan int, numberOfEvents) // should never use more than 1 but allow for more to possibly detect bugs - fn := func() error { - channel <- 0 - return nil - } - beforeEvents := time.Now() - wg := sync.WaitGroup{} - d := newRefreshDebouncer(2*time.Second, fn) - defer d.stop() - for i := 0; i < numberOfEvents; i++ { - wg.Add(1) - go func() { - defer wg.Done() - d.debounce() - }() - } - wg.Wait() - timeoutCh := time.After(2500 * time.Millisecond) // extra time to avoid flakiness - select { - case <-channel: - case <-timeoutCh: - t.Fatalf("timeout elapsed without flush function being called") - } - afterFunctionCall := time.Now() - - // use 1.5 seconds instead of 2 seconds to avoid timer precision issues - if afterFunctionCall.Sub(beforeEvents) < 1500*time.Millisecond { - t.Fatalf("function was called after %v ms instead of ~2 seconds", afterFunctionCall.Sub(beforeEvents).Milliseconds()) - } - - // wait another 2 seconds and check if function was called again - time.Sleep(2500 * time.Millisecond) - if len(channel) > 0 { - t.Fatalf("function was called more than once") - } -} - -// This test: -// -// 1 - Sends debounce requests when test starts -// 2 - Calls refreshNow() before the timer elapsed (which stops the timer) about 1.5 seconds after test starts -// -// The end result should be 1 refresh function call when refreshNow() is called. -func TestRefreshDebouncer_RefreshNow(t *testing.T) { - const numberOfEvents = 10 - channel := make(chan int, numberOfEvents) // should never use more than 1 but allow for more to possibly detect bugs - fn := func() error { - channel <- 0 - return nil - } - beforeEvents := time.Now() - eventsWg := sync.WaitGroup{} - d := newRefreshDebouncer(2*time.Second, fn) - defer d.stop() - for i := 0; i < numberOfEvents; i++ { - eventsWg.Add(1) - go func() { - defer eventsWg.Done() - d.debounce() - }() - } - - refreshNowWg := sync.WaitGroup{} - refreshNowWg.Add(1) - go func() { - defer refreshNowWg.Done() - time.Sleep(1500 * time.Millisecond) - d.refreshNow() - }() - - eventsWg.Wait() - select { - case <-channel: - t.Fatalf("function was called before the expected time") - default: - } - - refreshNowWg.Wait() - - timeoutCh := time.After(200 * time.Millisecond) // allow for 200ms of delay to prevent flakiness - select { - case <-channel: - case <-timeoutCh: - t.Fatalf("timeout elapsed without flush function being called") - } - afterFunctionCall := time.Now() - - // use 1 second instead of 1.5s to avoid timer precision issues - if afterFunctionCall.Sub(beforeEvents) < 1000*time.Millisecond { - t.Fatalf("function was called after %v ms instead of ~1.5 seconds", afterFunctionCall.Sub(beforeEvents).Milliseconds()) - } - - // wait some time and check if function was called again - time.Sleep(2500 * time.Millisecond) - if len(channel) > 0 { - t.Fatalf("function was called more than once") - } -} - -// This test: -// -// 1 - Sends debounce requests when test starts -// 2 - Calls refreshNow() before the timer elapsed (which stops the timer) about 1 second after test starts -// 3 - Sends more debounce requests (which resets the timer with a 3-second interval) about 2 seconds after test starts -// -// The end result should be 2 refresh function calls: -// -// 1 - When refreshNow() is called (1 second after the test starts) -// 2 - When the timer elapses after the second "wave" of debounce requests (5 seconds after the test starts) -func TestRefreshDebouncer_EventsAfterRefreshNow(t *testing.T) { - const numberOfEvents = 10 - channel := make(chan int, numberOfEvents) // should never use more than 2 but allow for more to possibly detect bugs - fn := func() error { - channel <- 0 - return nil - } - beforeEvents := time.Now() - wg := sync.WaitGroup{} - d := newRefreshDebouncer(3*time.Second, fn) - defer d.stop() - for i := 0; i < numberOfEvents; i++ { - wg.Add(1) - go func() { - defer wg.Done() - d.debounce() - time.Sleep(2000 * time.Millisecond) - d.debounce() - }() - } - - go func() { - time.Sleep(1 * time.Second) - d.refreshNow() - }() - - wg.Wait() - timeoutCh := time.After(1500 * time.Millisecond) // extra 500ms to prevent flakiness - select { - case <-channel: - case <-timeoutCh: - t.Fatalf("timeout elapsed without flush function being called after refreshNow()") - } - afterFunctionCall := time.Now() - - // use 500ms instead of 1s to avoid timer precision issues - if afterFunctionCall.Sub(beforeEvents) < 500*time.Millisecond { - t.Fatalf("function was called after %v ms instead of ~1 second", afterFunctionCall.Sub(beforeEvents).Milliseconds()) - } - - timeoutCh = time.After(4 * time.Second) // extra 1s to prevent flakiness - select { - case <-channel: - case <-timeoutCh: - t.Fatalf("timeout elapsed without flush function being called after debounce requests") - } - afterSecondFunctionCall := time.Now() - - // use 2.5s instead of 3s to avoid timer precision issues - if afterSecondFunctionCall.Sub(afterFunctionCall) < 2500*time.Millisecond { - t.Fatalf("function was called after %v ms instead of ~3 seconds", afterSecondFunctionCall.Sub(afterFunctionCall).Milliseconds()) - } - - if len(channel) > 0 { - t.Fatalf("function was called more than twice") - } -} - -func TestErrorBroadcaster_MultipleListeners(t *testing.T) { - b := newErrorBroadcaster() - defer b.stop() - const numberOfListeners = 10 - var listeners []<-chan error - for i := 0; i < numberOfListeners; i++ { - listeners = append(listeners, b.newListener()) - } - - err := errors.New("expected error") - wg := sync.WaitGroup{} - result := atomic.Value{} - for _, listener := range listeners { - currentListener := listener - wg.Add(1) - go func() { - defer wg.Done() - receivedErr, ok := <-currentListener - if !ok { - result.Store(errors.New("listener was closed")) - } else if receivedErr != err { - result.Store(errors.New("expected received error to be the same as the one that was broadcasted")) - } - }() - } - wg.Add(1) - go func() { - defer wg.Done() - b.broadcast(err) - b.stop() - }() - wg.Wait() - if loadedVal := result.Load(); loadedVal != nil { - t.Errorf(loadedVal.(error).Error()) - } -} - -func TestErrorBroadcaster_StopWithoutBroadcast(t *testing.T) { - var b = newErrorBroadcaster() - defer b.stop() - const numberOfListeners = 10 - var listeners []<-chan error - for i := 0; i < numberOfListeners; i++ { - listeners = append(listeners, b.newListener()) - } - - wg := sync.WaitGroup{} - result := atomic.Value{} - for _, listener := range listeners { - currentListener := listener - wg.Add(1) - go func() { - defer wg.Done() - // broadcaster stopped, expect listener to be closed - _, ok := <-currentListener - if ok { - result.Store(errors.New("expected listener to be closed")) - } - }() - } - wg.Add(1) - go func() { - defer wg.Done() - // call stop without broadcasting anything to current listeners - b.stop() - }() - wg.Wait() - if loadedVal := result.Load(); loadedVal != nil { - t.Errorf(loadedVal.(error).Error()) - } -} diff --git a/session.go b/session.go index 643897a3a..14fcaaf3a 100644 --- a/session.go +++ b/session.go @@ -18,6 +18,7 @@ import ( "time" "unicode" + "github.com/gocql/gocql/debounce" "github.com/gocql/gocql/internal/lru" ) @@ -43,7 +44,7 @@ type Session struct { frameObserver FrameHeaderObserver streamObserver StreamObserver hostSource *ringDescriber - ringRefresher *refreshDebouncer + ringRefresher *debounce.RefreshDebouncer stmtsLRU *preparedLRU connCfg *ConnConfig @@ -149,7 +150,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) { s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo) s.hostSource = &ringDescriber{cfg: &s.cfg, logger: s.logger} - s.ringRefresher = newRefreshDebouncer(ringRefreshDebounceTime, func() error { + s.ringRefresher = debounce.NewRefreshDebouncer(debounce.RingRefreshDebounceTime, func() error { return s.refreshRing() }) @@ -539,7 +540,7 @@ func (s *Session) Close() { } if s.ringRefresher != nil { - s.ringRefresher.stop() + s.ringRefresher.Stop() } if s.cancel != nil {