diff --git a/datanode/candlesv2/candle_updates.go b/datanode/candlesv2/candle_updates.go index 5b84b15297..10414f9ea0 100644 --- a/datanode/candlesv2/candle_updates.go +++ b/datanode/candlesv2/candle_updates.go @@ -104,7 +104,6 @@ func (s *CandleUpdates) run(ctx context.Context) { if len(candles) > 0 { lastCandle = &candles[len(candles)-1] } - subscriptions = s.sendCandlesToSubscribers(candles, subscriptions) } } @@ -224,12 +223,13 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) map[string]chan entities.Candle { ret := subscriptions for subscriptionID, outCh := range subscriptions { + loop: for _, candle := range candles { select { case outCh <- candle: default: - ret = removeSubscription(subscriptions, subscriptionID) - break + ret = removeSubscription(ret, subscriptionID) + break loop } } } diff --git a/datanode/candlesv2/candle_updates_test.go b/datanode/candlesv2/candle_updates_test.go index 8bfb6c356e..5e9bfea7ee 100644 --- a/datanode/candlesv2/candle_updates_test.go +++ b/datanode/candlesv2/candle_updates_test.go @@ -339,6 +339,37 @@ func TestSubscribeAndUnSubscribeWithNonReturningSource(t *testing.T) { updates.Unsubscribe(subID2) } +func TestMultipleSlowConsumers(t *testing.T) { + nSends := 100 + testCandleSource := &testCandleSource{candles: make(chan []entities.Candle, nSends), errorCh: make(chan error)} + // ensure the sub channels are buffered + updates := candlesv2.NewCandleUpdates(context.Background(), logging.NewTestLogger(), "testCandles", + testCandleSource, newTestCandleConfig(5).CandleUpdates) + startTime := time.Now() + + updated := startTime + firstCandle := createCandle(startTime, updated, 1, 1, 1, 1, 10, 200) + lastCandle := firstCandle // just for the sake of types + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + testCandleSource.candles <- []entities.Candle{firstCandle} + + // keep updating the most recent candle + for i := 0; i < nSends; i++ { + updated = updated.Add(time.Second * time.Duration(i)) + lastCandle = createCandle(startTime, updated, 1, 1, 1, 1, 10, 200) + testCandleSource.candles <- []entities.Candle{lastCandle, lastCandle, lastCandle} + } + }() + // ensure the first candle is sent + _, _, _ = updates.Subscribe() + _, _, _ = updates.Subscribe() + wg.Wait() +} + func newTestCandleConfig(subscribeBufferSize int) candlesv2.Config { conf := candlesv2.NewDefaultConfig() conf.CandleUpdates = candlesv2.CandleUpdatesConfig{