diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fcd8916882..bab44f2b318 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - [11486](https://github.com/vegaprotocol/vega/issues/11486) - `AMMs` can now be submitted on markets with more decimal places than asset decimal places. - [11540](https://github.com/vegaprotocol/vega/issues/11540) - Fix spam check for spots to use not double count quantum. - [11542](https://github.com/vegaprotocol/vega/issues/11542) - Fix non determinism in lottery ranking. +- [11544](https://github.com/vegaprotocol/vega/issues/11544) - Fix empty candles stream. ## 0.77.5 diff --git a/datanode/candlesv2/candle_updates.go b/datanode/candlesv2/candle_updates.go index bc3758e03f6..41fb0e0d36d 100644 --- a/datanode/candlesv2/candle_updates.go +++ b/datanode/candlesv2/candle_updates.go @@ -74,6 +74,7 @@ func (s *CandleUpdates) run(ctx context.Context) { defer closeAllSubscriptions(subscriptions) ticker := time.NewTicker(s.config.CandleUpdatesStreamInterval.Duration) + defer ticker.Stop() var lastCandle *entities.Candle errorGettingCandleUpdates := false @@ -83,37 +84,28 @@ func (s *CandleUpdates) run(ctx context.Context) { return case subscriptionMsg := <-s.subscriptionMsgChan: s.handleSubscription(subscriptions, subscriptionMsg, lastCandle) - default: - select { - case <-ctx.Done(): - return - case subscription := <-s.subscriptionMsgChan: - s.handleSubscription(subscriptions, subscription, lastCandle) - case <-ticker.C: - if len(subscriptions) > 0 { - candles, err := s.getCandleUpdates(ctx, lastCandle) - if err != nil { - if !errorGettingCandleUpdates { - s.log.Errorf("failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err)) - } - - errorGettingCandleUpdates = true - } else { - if errorGettingCandleUpdates { - s.log.Infof("successfully got candles for candle", logging.String("candle", s.candleID)) - } - errorGettingCandleUpdates = false - - if len(candles) > 0 { - lastCandle = &candles[len(candles)-1] - } - - s.sendCandlesToSubscribers(candles, subscriptions) - } - } else { - lastCandle = nil + case now := <-ticker.C: + if len(subscriptions) == 0 { + lastCandle = nil + continue + } + candles, err := s.getCandleUpdates(ctx, lastCandle, now) + if err != nil { + if !errorGettingCandleUpdates { + s.log.Errorf("failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err)) } + errorGettingCandleUpdates = true + continue } + if errorGettingCandleUpdates { + s.log.Infof("Successfully got candles for candle", logging.String("candle", s.candleID)) + errorGettingCandleUpdates = false + } + if len(candles) > 0 { + lastCandle = &candles[len(candles)-1] + } + + s.sendCandlesToSubscribers(candles, subscriptions) } } } @@ -189,7 +181,7 @@ func (s *CandleUpdates) sendSubscriptionMessage(msg subscriptionMsg) error { return nil } -func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entities.Candle) ([]entities.Candle, error) { +func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entities.Candle, now time.Time) ([]entities.Candle, error) { ctx, cancelFn := context.WithTimeout(ctx, s.config.CandlesFetchTimeout.Duration) defer cancelFn() @@ -198,14 +190,14 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti if lastCandle != nil { start := lastCandle.PeriodStart var candles []entities.Candle - candles, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, &start, nil, entities.CursorPagination{}) + candles, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, &start, &now, entities.CursorPagination{}) if err != nil { return nil, fmt.Errorf("getting candle updates:%w", err) } for _, candle := range candles { - if candle.LastUpdateInPeriod.After(lastCandle.LastUpdateInPeriod) { + if candle.LastUpdateInPeriod.After(lastCandle.LastUpdateInPeriod) || candle.PeriodStart.After(lastCandle.PeriodStart) { updates = append(updates, candle) } } @@ -215,7 +207,7 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti if err != nil { return nil, err } - updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, nil, pagination) + updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, &now, pagination) if err != nil { return nil, fmt.Errorf("getting candle updates:%w", err)