Skip to content

Commit

Permalink
Merge pull request #11555 from vegaprotocol/11544-candle-stream
Browse files Browse the repository at this point in the history
fix: specify end time when fetching new candles
  • Loading branch information
jeremyletang authored Aug 12, 2024
2 parents bc366bd + bf1a933 commit 70ef7cf
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- [11486](https://github.com/vegaprotocol/vega/issues/11486) - `AMMs` can now be submitted on markets with more decimal places than asset decimal places.
- [11540](https://github.com/vegaprotocol/vega/issues/11540) - Fix spam check for spots to use not double count quantum.
- [11542](https://github.com/vegaprotocol/vega/issues/11542) - Fix non determinism in lottery ranking.
- [11544](https://github.com/vegaprotocol/vega/issues/11544) - Fix empty candles stream.


## 0.77.5
Expand Down
58 changes: 25 additions & 33 deletions datanode/candlesv2/candle_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (s *CandleUpdates) run(ctx context.Context) {
defer closeAllSubscriptions(subscriptions)

ticker := time.NewTicker(s.config.CandleUpdatesStreamInterval.Duration)
defer ticker.Stop()
var lastCandle *entities.Candle

errorGettingCandleUpdates := false
Expand All @@ -83,37 +84,28 @@ func (s *CandleUpdates) run(ctx context.Context) {
return
case subscriptionMsg := <-s.subscriptionMsgChan:
s.handleSubscription(subscriptions, subscriptionMsg, lastCandle)
default:
select {
case <-ctx.Done():
return
case subscription := <-s.subscriptionMsgChan:
s.handleSubscription(subscriptions, subscription, lastCandle)
case <-ticker.C:
if len(subscriptions) > 0 {
candles, err := s.getCandleUpdates(ctx, lastCandle)
if err != nil {
if !errorGettingCandleUpdates {
s.log.Errorf("failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err))
}

errorGettingCandleUpdates = true
} else {
if errorGettingCandleUpdates {
s.log.Infof("successfully got candles for candle", logging.String("candle", s.candleID))
}
errorGettingCandleUpdates = false

if len(candles) > 0 {
lastCandle = &candles[len(candles)-1]
}

s.sendCandlesToSubscribers(candles, subscriptions)
}
} else {
lastCandle = nil
case now := <-ticker.C:
if len(subscriptions) == 0 {
lastCandle = nil
continue
}
candles, err := s.getCandleUpdates(ctx, lastCandle, now)
if err != nil {
if !errorGettingCandleUpdates {
s.log.Errorf("failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err))
}
errorGettingCandleUpdates = true
continue
}
if errorGettingCandleUpdates {
s.log.Infof("Successfully got candles for candle", logging.String("candle", s.candleID))
errorGettingCandleUpdates = false
}
if len(candles) > 0 {
lastCandle = &candles[len(candles)-1]
}

s.sendCandlesToSubscribers(candles, subscriptions)
}
}
}
Expand Down Expand Up @@ -189,7 +181,7 @@ func (s *CandleUpdates) sendSubscriptionMessage(msg subscriptionMsg) error {
return nil
}

func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entities.Candle) ([]entities.Candle, error) {
func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entities.Candle, now time.Time) ([]entities.Candle, error) {
ctx, cancelFn := context.WithTimeout(ctx, s.config.CandlesFetchTimeout.Duration)
defer cancelFn()

Expand All @@ -198,14 +190,14 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti
if lastCandle != nil {
start := lastCandle.PeriodStart
var candles []entities.Candle
candles, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, &start, nil, entities.CursorPagination{})
candles, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, &start, &now, entities.CursorPagination{})

if err != nil {
return nil, fmt.Errorf("getting candle updates:%w", err)
}

for _, candle := range candles {
if candle.LastUpdateInPeriod.After(lastCandle.LastUpdateInPeriod) {
if candle.LastUpdateInPeriod.After(lastCandle.LastUpdateInPeriod) || candle.PeriodStart.After(lastCandle.PeriodStart) {
updates = append(updates, candle)
}
}
Expand All @@ -215,7 +207,7 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti
if err != nil {
return nil, err
}
updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, nil, pagination)
updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, &now, pagination)

if err != nil {
return nil, fmt.Errorf("getting candle updates:%w", err)
Expand Down

0 comments on commit 70ef7cf

Please sign in to comment.