Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove subscription from main map if first send fails #11709

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions datanode/candlesv2/candle_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *CandleUpdates) run(ctx context.Context) {
case <-ctx.Done():
return
case subscriptionMsg := <-s.subscriptionMsgChan:
s.handleSubscription(subscriptions, subscriptionMsg, lastCandle)
subscriptions = s.handleSubscription(subscriptions, subscriptionMsg, lastCandle)
case now := <-ticker.C:
if len(subscriptions) == 0 {
lastCandle = nil
Expand All @@ -105,31 +105,37 @@ func (s *CandleUpdates) run(ctx context.Context) {
lastCandle = &candles[len(candles)-1]
}

s.sendCandlesToSubscribers(candles, subscriptions)
subscriptions = s.sendCandlesToSubscribers(candles, subscriptions)
}
}
}

func (s *CandleUpdates) handleSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) {
func (s *CandleUpdates) handleSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) map[string]chan entities.Candle {
if subscription.subscribe {
s.addSubscription(subscriptions, subscription, lastCandle)
} else {
removeSubscription(subscriptions, subscription.id)
return s.addSubscription(subscriptions, subscription, lastCandle)
}
return removeSubscription(subscriptions, subscription.id)
}

func (s *CandleUpdates) addSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) {
subscriptions[subscription.id] = subscription.out
func (s *CandleUpdates) addSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) map[string]chan entities.Candle {
if lastCandle != nil {
s.sendCandlesToSubscribers([]entities.Candle{*lastCandle}, map[string]chan entities.Candle{subscription.id: subscription.out})
if rm := s.sendCandlesToSubscribers([]entities.Candle{*lastCandle}, map[string]chan entities.Candle{subscription.id: subscription.out}); len(rm) == 0 {
// try to send the last candle data to the new subscription, if it fails, don't update the map
return subscriptions
}
}
subscriptions[subscription.id] = subscription.out
return subscriptions
}

func removeSubscription(subscriptions map[string]chan entities.Candle, subscriptionID string) {
if _, ok := subscriptions[subscriptionID]; ok {
close(subscriptions[subscriptionID])
func removeSubscription(subscriptions map[string]chan entities.Candle, subscriptionID string) map[string]chan entities.Candle {
if ch, ok := subscriptions[subscriptionID]; ok {
// first delete
delete(subscriptions, subscriptionID)
// then close
close(ch)
}
return subscriptions
}

func closeAllSubscriptions(subscribers map[string]chan entities.Candle) {
Expand Down Expand Up @@ -215,15 +221,17 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti
return updates, nil
}

func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) {
func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) map[string]chan entities.Candle {
ret := subscriptions
for subscriptionID, outCh := range subscriptions {
for _, candle := range candles {
select {
case outCh <- candle:
default:
removeSubscription(subscriptions, subscriptionID)
ret = removeSubscription(subscriptions, subscriptionID)
break
}
}
}
return ret
}
66 changes: 64 additions & 2 deletions datanode/candlesv2/candle_updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package candlesv2_test
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand All @@ -28,6 +29,7 @@ import (

"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type nonReturningCandleSource struct{}
Expand Down Expand Up @@ -67,14 +69,74 @@ func (t *testCandleSource) GetCandleDataForTimeSpan(ctx context.Context, candleI
}
}

func TestSubscribeAndUnsubscribeCloseChannelPanic(t *testing.T) {
testCandleSource := &testCandleSource{candles: make(chan []entities.Candle, 3), errorCh: make(chan error)}
// ensure the sub channels are buffered
updates := candlesv2.NewCandleUpdates(context.Background(), logging.NewTestLogger(), "testCandles",
testCandleSource, newTestCandleConfig(5).CandleUpdates)
startTime := time.Now()

updated := startTime
firstCandle := createCandle(startTime, updated, 1, 1, 1, 1, 10, 200)
lastCandle := firstCandle // just for the sake of types
fCh := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
testCandleSource.candles <- []entities.Candle{firstCandle}
close(fCh)
// keep updating the most recent candle
for i := 0; i < 3; i++ {
updated = updated.Add(time.Second * time.Duration(i))
lastCandle = createCandle(startTime, updated, 1, 1, 1, 1, 10, 200)
testCandleSource.candles <- []entities.Candle{lastCandle}
}
}()
<-fCh
// ensure the first candle is sent
sub1Id, out1, _ := updates.Subscribe()
sub2Id, out2, _ := updates.Subscribe()

candle1 := <-out1
assert.Equal(t, firstCandle, candle1)

candle2 := <-out2
assert.Equal(t, firstCandle, candle2)

// unsubscribe the first subscriber
updates.Unsubscribe(sub1Id)
// now wait for the updates:
wg.Wait()
sub3Id, out3, _ := updates.Subscribe()
candle3 := <-out3
require.Equal(t, lastCandle, candle3)
// this should unsubscribe sub2 already
testCandleSource.errorCh <- fmt.Errorf("transient error")

// this sub should get instantly unsubscribed.
errSub, eOut, _ := updates.Subscribe()
require.NotNil(t, eOut)
// reading from the channel should indicate it was closed already due to the error
// once the channel is closed, the subscriber effectively has to have been removed.
_, closed := <-eOut
require.True(t, closed)
// we can still safely call unsubscribe, though:
updates.Unsubscribe(errSub)
updates.Unsubscribe(sub2Id)
updates.Unsubscribe(sub3Id)
}

func TestSubscribeAndUnsubscribeWhenCandleSourceErrorsAlways(t *testing.T) {
errorsAlwaysCandleSource := &errorsAlwaysCandleSource{}

updates := candlesv2.NewCandleUpdates(context.Background(), logging.NewTestLogger(), "testCandles",
errorsAlwaysCandleSource, newTestCandleConfig(0).CandleUpdates)

sub1Id, _, _ := updates.Subscribe()
sub2Id, _, _ := updates.Subscribe()
sub1Id, _, err1 := updates.Subscribe()
sub2Id, _, err2 := updates.Subscribe()
require.NoError(t, err1)
require.NoError(t, err2)

updates.Unsubscribe(sub1Id)
updates.Unsubscribe(sub2Id)
Expand Down
Loading